In [7]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
import numpy as np

spark = SparkSession.builder.getOrCreate()
print(spark)


<pyspark.sql.session.SparkSession object at 0x7f01430cec10>


In [8]:
flights_path = 'datasets/flights_small.csv'
airports_path = 'datasets/airports.csv'
planes_path = 'datasets/planes.csv'

In [9]:
flights_pandas = pd.read_csv(flights_path)
flights_pandas.head(3)

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658.0,-7.0,935.0,-5.0,VX,N846VA,1780,SEA,LAX,132.0,954,6.0,58.0
1,2014,1,22,1040.0,5.0,1505.0,5.0,AS,N559AS,851,SEA,HNL,360.0,2677,10.0,40.0
2,2014,3,9,1443.0,-2.0,1652.0,2.0,VX,N847VA,755,SEA,SFO,111.0,679,14.0,43.0


In [10]:
'''
Movendo um DataFrame Pandas com 10 linhas para um Cluster Spark
'''
flights_pandas_10 = flights_pandas.head(10)
flights_spark_10 = spark.createDataFrame(flights_pandas_10)
flights_spark_10.show(3)
#flights_spark_10.createOrReplaceGlobalTempView("temp")
#print(spark.catalog.listTables())

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|   658.0|     -7.0|   935.0|     -5.0|     VX| N846VA|  1780|   SEA| LAX|   132.0|     954| 6.0|  58.0|
|2014|    1| 22|  1040.0|      5.0|  1505.0|      5.0|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|10.0|  40.0|
|2014|    3|  9|  1443.0|     -2.0|  1652.0|      2.0|     VX| N847VA|   755|   SEA| SFO|   111.0|     679|14.0|  43.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 3 rows



In [11]:
'''
Movendo um Cluster Spark para um DataFrame Pandas
'''
flights_pandas_10 = flights_spark_10.toPandas()
flights_pandas_10.head(3)

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658.0,-7.0,935.0,-5.0,VX,N846VA,1780,SEA,LAX,132.0,954,6.0,58.0
1,2014,1,22,1040.0,5.0,1505.0,5.0,AS,N559AS,851,SEA,HNL,360.0,2677,10.0,40.0
2,2014,3,9,1443.0,-2.0,1652.0,2.0,VX,N847VA,755,SEA,SFO,111.0,679,14.0,43.0


In [12]:
'''
Lendo um arquivo csv diretamente no spark
'''
airports = spark.read.csv(airports_path, header=True)
airports.show(3)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 3 rows



**Caso tenha uma tabela no catálogo/temp
flights = spark.table("flights")
flights.show()
flights = flights.withColumn("duration_hrs", flights.air_time/60)**

In [13]:
flights = spark.read.csv(flights_path, header=True)
flights = flights.withColumn("duration_hrs", flights.air_time/60)
flights.show(3)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|         2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|        1.85|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 3 rows



In [14]:
'''
O método Filter funciona como o método WHERE do SQL e tem duas formas de passagem de parâmetros, como string ou como sintaxe df.colName
'''
long_flights_1 = flights.filter("distance > 1000")
long_flights_2 = flights.filter(flights.distance > 1000)
long_flights_1.show(1)
long_flights_2.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+-

In [15]:
'''
Filtrando as colunas origin e dest apenas que contém os valores SEA e PDX respectivamente
'''
flights_selected_1 = flights.select("tailnum", "origin", "dest")
flights_temp = flights.select(flights.origin, flights.dest, flights.carrier)
flights_selected_1.show(3)
filterA = flights.origin == "SEA"
filterB = flights.dest == "PDX"
flights_selected_2 = flights_temp.filter(filterA).filter(filterB)
flights_selected_2.show(3)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
+-------+------+----+
only showing top 3 rows

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 3 rows



In [16]:
'''
Utiizando alias para gerar uma nova coluna coluna ao dataframe
'''
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
speed_1 = flights.select("origin", "dest", "tailnum", "air_time", avg_speed)
speed_2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
flights.select((flights.distance/(flights.air_time/60)).alias("duration_hrs")).show(3)
flights.selectExpr("air_time/60 as duration_hrs").show(3)
flights.select("origin", "dest", "tailnum", avg_speed).show(3)
flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed").show(3)

+------------------+
|      duration_hrs|
+------------------+
| 433.6363636363636|
| 446.1666666666667|
|367.02702702702703|
+------------------+
only showing top 3 rows

+------------+
|duration_hrs|
+------------+
|         2.2|
|         6.0|
|        1.85|
+------------+
only showing top 3 rows

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
+------+----+-------+------------------+
only showing top 3 rows

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
+------+----+-------+------------------+
only showing top 3 rows



In [17]:
'''
Utiizando ao método "col" para converter as colunas em double.
groupBy para agrupar pelo filtro especificado
'''
flights = flights.withColumn("distance", col("distance").cast("double"))
flights = flights.withColumn("air_time", col("air_time").cast("double"))
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

+-------------+
|min(distance)|
+-------------+
|        106.0|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|        409.0|
+-------------+



In [18]:
flights_by_plane = flights.groupBy("tailnum")
flights_by_plane.count().show(3)
flights_by_origin = flights.groupBy("origin")
flights_by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
+-------+-----+
only showing top 3 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [19]:
'''
Utiizando ao método "col" para converter a coluna dep_delay em double
método agg da biblioteca pyspark.sql.functions para encontrar o desvio padrão
'''
flights = flights.withColumn("dep_delay", col("dep_delay").cast("double"))
flights_by_month_dest = flights.groupBy("month", "dest")

flights_by_month_dest.avg("dep_delay").show(3)

flights_by_month_dest.agg(F.stddev("dep_delay")).show(3)

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|   11| TUS|-2.3333333333333335|
|   11| ANC|  7.529411764705882|
|    1| BUR|              -1.45|
+-----+----+-------------------+
only showing top 3 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|    18.604716401245316|
|    1| BUR|     15.22627576540667|
+-----+----+----------------------+
only showing top 3 rows



In [20]:
'''
Trabalhando com o método join 
primeiro parâmetro o dataframe que será agregado;
segundo parâmetro a coluna chave;
terceiro parâmetro "leftouter".
'''
print(airports.show(3))
airports = airports.withColumnRenamed("faa", "dest")
flights_with_airports = flights.join(airports, "dest", "leftouter")
print(flights_with_airports.show(3))

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 3 rows

None
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------+------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|duration_hrs|              name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------+------------------+------

In [21]:
'''
Método withColumnRenamed 1° parâmetro o nome da coluna, 2° parâmetro o novo nome
'''
planes = spark.read.csv(planes_path, header=True)
planes = planes.withColumnRenamed("year", "plane_year")
model_data = flights.join(planes, on="tailnum", how="leftouter")
model_data.show(3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|     -7.0|     935|       -5|     VX|  1780|   SEA| LAX|   132.0|   954.0|   6|    58|         2.2|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|      5.0|    1505|        5|     AS|   851|   SEA| HNL|   360.0|  2677.0|  10|    40|         6.0|

In [22]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [23]:
'''
Criando uma nova coluna e passando como valor a subtração da coluna year por plane_year
'''
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
model_data.show(3)

23/10/22 15:21:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
| N846VA|2014|   12|  8|     658|     -7.0|     935|       -5|     VX|  1780|   SEA| LAX|     132|   954.0|   6|    58|         2.2|      2

In [24]:
'''
Utilizando o método withColumn pare gerar duas novas colunas ao model_data
Coluna is_late que será booleana (verdadeiro para valores de arr_delay maiores que 0)
Coluna late que é uma conversão da coluna is_late para inteiro (somente 0 ou 1)
Método filter para verificar as colunas arr_delay, dep_delaye air_time e filtrar apenas pelas que não possuem valores nulos
'''
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")
model_data.show(3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|     -7.0|     935|       -5|     VX|  1780|   SEA| LAX|     132|   954.0|   6|    58|         2.2|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|    0|
| N559AS|2014|    1| 22|    1040|   

In [25]:
'''
Método StringIndexer atribui um valor numérico a cada um dos valores categóricos com base na ordem de ocorrência
Método OneHotEncoder:
#primeiro valor corresponde ao total de categorias únicas
#segundo valor corresponde ao StringIndexer da categoria
#terceiro valor é binário, 1 para presença da categoria e 0 para ausência
VectorAssembler: combina todas as colunas númericas em uma única coluna, minimiza o uso de memória e acelera o processamento
O valor 81 indica o tamanho total do vetor.
Etapas de um pipeline
'''
carr_indexer = StringIndexer(inputCol = "carrier", outputCol = "carrier_index")
carr_encoder = OneHotEncoder(inputCol = "carrier_index", outputCol = "carrier_fact")

dest_indexer = StringIndexer(inputCol = "dest", outputCol = "dest_index")
dest_encoder = OneHotEncoder(inputCol = "dest_index", outputCol = "dest_fact")

vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

In [26]:
'''
Etapas do pipeline, define a sequência de transformações a serem aplicadas aos dados
piped_data ajusta o pipeline aos dados de treinamento e em seguida aplica transformações nesses dados
'''
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
piped_data = flights_pipe.fit(model_data).transform(model_data)
piped_data.show(3)

training, test = piped_data.randomSplit([.6, .4])
'''
dividindo dados transformados piped_data em conjuntos de treinamento e teste 60%, 40%
'''

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
| N846VA|2014|   12|  8|     658|     -7.0|   

'\ndividindo dados transformados piped_data em conjuntos de treinamento e teste 60%, 40%\n'

In [27]:
'''
evals é o alias da biblioteca pyspark.ml.evaluation
tune é o alias da biblioteca pyspark.ml.tuning
'''
lr = LogisticRegression() #criando instância modelo de Regressão Logística. Modelo de machine learning para ajustar e otimizar
evaluator = evals.BinaryClassificationEvaluator(metricName = "areaUnderROC")

grid = tune.ParamGridBuilder() #criando grade de hiperparâmetros
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01)) #testar valores entre 0 à 0.1 com incrementos de 0.01 
grid = grid.addGrid(lr.elasticNetParam, [0, 1]) #0 (apenas L2) e 1 (apenas L1)
grid = grid.build() #finalizando a construção da grade de hiperparâmetros

cv = tune.CrossValidator(estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator)
'''
criando objeto CrossValidator para encontrar os melhores hiperparâmetros.
primeiro argumento especifica o modelo a ser otimizado
segundo argumento especifica a grade de hiperparâmetros a serem testados
terceiro argumento especifica o avaliador a ser usado para medir a qualidade das previsões
'''

'\ncriando objeto CrossValidator para encontrar os melhores hiperparâmetros.\nprimeiro argumento especifica o modelo a ser otimizado\nsegundo argumento especifica a grade de hiperparâmetros a serem testados\nterceiro argumento especifica o avaliador a ser usado para medir a qualidade das previsões\n'

In [28]:
'''
Treinando o modelo de Regressão Logística best_lr utilizando o conjunto de treinamento para serem feitas previsões
test_results está fazendo previsões utilizando o modelo treinado no conjunto de teste
'''
best_lr = lr.fit(training)
print(best_lr)

test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results))
test_results.show(3)

                                                                                

23/10/22 15:21:33 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
LogisticRegressionModel: uid=LogisticRegression_0db5b7a171a4, numClasses=2, numFeatures=81
0.7020610402465913
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+--------------------+--------------------+----------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|       rawPrediction|         probability|prediction|
+-------+----+-----+-