In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
import numpy as np

# Crear nuestra configuración
conf = SparkConf().setMaster('local').setAppName('Mi programa')
# Generar los contextos
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

# Cargar el dataframe
dfspark = sqlContext.read.format('csv').option('header','true').option('inferSchema','true').load('./base_datos_2008.csv',nrows=1e6)
# Seleccionar una pequeña fracción de los datos
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
# Transformar nuestra columna de interés (retraso en los vuelos), se transformará en un entero
dfspark = dfspark.withColumn('ArrDelay', dfspark['ArrDelay'].cast('integer'))

# Eliminar los datos faltantes
df2 = dfspark.na.drop(subset=['ArrDelay','DepDelay','Distance'])
# Eliminar los que estén clasificados como NULL
df2 = df2.filter('ArrDelay is not NULL')
df2 = df2.dropDuplicates()

In [2]:
# Filtrar nuestros datos dado un valor numérico
# En formato texto se coloca el nombre de la columna que queremos filtrar, se selecciona aquellos valores que superen los 60 minutos
# Para indicar que solo queremos los valores se usa el comando select
# Con take se pide que nos de 5
# con .take(1)[0] Nos da exactamente el valor
df2.select('ArrDelay').filter('ArrDelay > 60').take(5)

[Row(ArrDelay=107),
 Row(ArrDelay=123),
 Row(ArrDelay=96),
 Row(ArrDelay=73),
 Row(ArrDelay=178)]

In [3]:
# Si queremos cada una de las filas del data frame se ignora .select
df2.filter("ArrDelay > 60").take(5)


[Row(Year=2008, Month=4, DayofMonth=11, DayOfWeek=5, DepTime='835', CRSDepTime=730, ArrTime='1159', CRSArrTime=1012, UniqueCarrier='CO', FlightNum=347, TailNum='N17345', ActualElapsedTime='204', CRSElapsedTime='162', AirTime='146', ArrDelay=107, DepDelay='65', Origin='ORD', Dest='IAH', Distance=925, TaxiIn='14', TaxiOut='44', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='65', WeatherDelay='0', NASDelay='42', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=5, DayofMonth=31, DayOfWeek=6, DepTime='2032', CRSDepTime=1825, ArrTime='2311', CRSArrTime=2108, UniqueCarrier='DL', FlightNum=1874, TailNum='N664DN', ActualElapsedTime='159', CRSElapsedTime='163', AirTime='135', ArrDelay=123, DepDelay='127', Origin='MCO', Dest='BDL', Distance=1050, TaxiIn='7', TaxiOut='17', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='123'),
 Row(Year=2008, Month=6, DayofMonth=27, DayOfWeek=5, D

In [4]:
# Crear un objeto, la media de la columna
# Indica que recoja todos los valores de esta columna y se le aplique el método
media = np.mean(df2.select('ArrDelay').collect())
# Se le aplica una función a la columna utilizando el objeto rdd que almacena la columna
# Se utiliza una función map que aplica una función lambda a cada uno de los elementos, calculando la distnacia al cuadrado de el valor menos la media
# Se muestran los primeros 10 casos
df2.select('ArrDelay').rdd.map(lambda x: (x - media)**2).take(10)

[array([1.81283774]),
 array([336.59100317]),
 array([87.35550383]),
 array([214.72750557]),
 array([861.21216904]),
 array([1417.79234058]),
 array([128.74117035]),
 array([861.21216904]),
 array([21.65583818]),
 array([1494.09950732])]

In [5]:
# En PySpark también se utiliza groupBy
# Se indica qué función descriptiva se requiere (.count() en este caso)
# .show() para mostrar los datos

df2.groupBy('DayOfWeek').count().show()

+---------+-----+
|DayOfWeek|count|
+---------+-----+
|        1| 1020|
|        6|  815|
|        3|  996|
|        5| 1061|
|        4|  966|
|        7| 1007|
|        2| 1014|
+---------+-----+



In [6]:
# Encontrar resúmenes más interesantes
# Se aplica otra función distinta, en este caso la media a otra columna del dataframe
# Muestra una tabla resumen de cada uno de los días de la semana con la función que se pide
df2.groupBy('DayOfWeek').mean('ArrDelay').show()

+---------+------------------+
|DayOfWeek|     avg(ArrDelay)|
+---------+------------------+
|        1| 6.337254901960784|
|        6| 5.298159509202454|
|        3| 5.809236947791165|
|        5|10.162111215834118|
|        4| 8.784679089026914|
|        7|10.804369414101291|
|        2|10.551282051282051|
+---------+------------------+



In [7]:
# Ver todos los origenes distintos que tenemos en nuestra base de datos
# Se seleccionan los datos
# Se selecciona el rdd asociado
# Se aplica el método .distinct(), este muestra una lista de todos los origenes distintos que se tienen en la base de datos
# Se muestran los primeros 5 (no están ordenados bajo ningún criterio)
df2.select('Origin').rdd.distinct().take(5)

[Row(Origin='PIT'),
 Row(Origin='FSM'),
 Row(Origin='SMF'),
 Row(Origin='SPI'),
 Row(Origin='CWA')]

In [8]:
# Similar al código anterior pero en este caso se utiliza count, que indicará cuántos índices distintos tenemos 
df2.select('Origin').rdd.distinct().count()

246

In [9]:
# Pedir que se ordene el DataFrame por alguna de las columnas que tengamos
# Se selecciona ArrDelay y pedir que lo haga de manera descendentes y toma las 5 primeras
df2.orderBy(df2.ArrDelay.desc()).take(5)
# Muestra los vuelos que más se han retrasado

[Row(Year=2008, Month=4, DayofMonth=15, DayOfWeek=2, DepTime='1529', CRSDepTime=625, ArrTime='1703', CRSArrTime=759, UniqueCarrier='MQ', FlightNum=3932, TailNum='N638AE', ActualElapsedTime='94', CRSElapsedTime='94', AirTime='71', ArrDelay=544, DepDelay='544', Origin='SGF', Dest='ORD', Distance=438, TaxiIn='11', TaxiOut='12', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='544', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=2, DayofMonth=6, DayOfWeek=3, DepTime='1641', CRSDepTime=810, ArrTime='1823', CRSArrTime=1005, UniqueCarrier='MQ', FlightNum=4253, TailNum='N908AE', ActualElapsedTime='102', CRSElapsedTime='115', AirTime='86', ArrDelay=498, DepDelay='511', Origin='ORD', Dest='ICT', Distance=588, TaxiIn='3', TaxiOut='13', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='498', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0'),
 Row(Year=2008, Month=10, DayofMonth=25, DayOfWeek=6, DepTi