In [2]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from pyspark.sql.functions import *

In [6]:
df = spark.read.option('header','true').option('inferSchema', 'true').csv('./data/flights.csv')
df.printSchema()



root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

                                                                                

### count

In [15]:
df = spark.read.parquet('./data/dataframe/dataframe.parquet')
df.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- color: string (nullable = true)
 |-- cantidad: long (nullable = true)



In [17]:
df.select(
    count('nombre').alias('conteo_nombre'),
    count('color').alias('count_color')
).show()

+-------------+-----------+
|conteo_nombre|count_color|
+-------------+-----------+
|            2|          3|
+-------------+-----------+



In [18]:
df.select(
    count('nombre').alias('conteo_nombre'),
    count('color').alias('count_color'),
    count('*').alias('conteo_general')
).show()

+-------------+-----------+--------------+
|conteo_nombre|count_color|conteo_general|
+-------------+-----------+--------------+
|            2|          3|             4|
+-------------+-----------+--------------+



### countDistinct

In [19]:
df.select(
    countDistinct('color').alias('colores_dif')
).show()

+-----------+
|colores_dif|
+-----------+
|          2|
+-----------+



### approx_count_distinct

In [20]:
df = spark.read.option('header','true').option('inferSchema', 'true').csv('./data/flights.csv')

                                                                                

In [None]:
df.columns

In [24]:
df.select(
    countDistinct('AIRLINE'),
    approx_count_distinct('AIRLINE')
).show()



+-----------------------+------------------------------+
|count(DISTINCT AIRLINE)|approx_count_distinct(AIRLINE)|
+-----------------------+------------------------------+
|                     14|                            13|
+-----------------------+------------------------------+



                                                                                

### min y max

In [26]:
df.select(
    min('AIR_TIME').alias('menor_tiempo'),
    max('AIR_TIME').alias('mayor_tiempo'),
).show()




+------------+------------+
|menor_tiempo|mayor_tiempo|
+------------+------------+
|           7|         690|
+------------+------------+



                                                                                

In [27]:
df.select(
    min('AIRLINE_DELAY').alias('menor_tiempo'),
    max('AIRLINE_DELAY').alias('mayor_tiempo'),
).show()



+------------+------------+
|menor_tiempo|mayor_tiempo|
+------------+------------+
|           0|        1971|
+------------+------------+



                                                                                

### sum, sumDistinct y avg

In [28]:
df.select(
    sum('DISTANCE').alias('suma_dist')
).show()

[Stage 39:>                                                       (0 + 12) / 12]

+----------+
| suma_dist|
+----------+
|4785357409|
+----------+



                                                                                

In [31]:
df.select(
   sumDistinct('DISTANCE').alias('suma_dist_dif') #sum_distinct
).show()



+-------------+
|suma_dist_dif|
+-------------+
|      1442300|
+-------------+



                                                                                

In [33]:
df.select(
   avg('AIR_TIME').alias('promedio_aire'),
   (sum('AIR_TIME') / count('AIR_TIME')).alias('prom_manual')
).show()



+------------------+------------------+
|     promedio_aire|       prom_manual|
+------------------+------------------+
|113.51162809012519|113.51162809012519|
+------------------+------------------+



                                                                                

In [None]:
df.select(
   
).show()

### Agregacion y agrupacion

In [34]:
df = spark.read.option('header','true').option('inferSchema', 'true').csv('./data/flights.csv')

                                                                                

In [35]:
df.groupBy('ORIGIN_AIRPORT').count().orderBy(desc('count')).show()



+--------------+------+
|ORIGIN_AIRPORT| count|
+--------------+------+
|           ATL|346836|
|           ORD|285884|
|           DFW|239551|
|           DEN|196055|
|           LAX|194673|
|           SFO|148008|
|           PHX|146815|
|           IAH|146622|
|           LAS|133181|
|           MSP|112117|
|           MCO|110982|
|           SEA|110899|
|           DTW|108500|
|           BOS|107847|
|           EWR|101772|
|           CLT|100324|
|           LGA| 99605|
|           SLC| 97210|
|           JFK| 93811|
|           BWI| 86079|
+--------------+------+
only showing top 20 rows



                                                                                

In [36]:
df.groupBy('ORIGIN_AIRPORT','DESTINATION_AIRPORT').count().orderBy(desc('count')).show()



+--------------+-------------------+-----+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|count|
+--------------+-------------------+-----+
|           SFO|                LAX|13744|
|           LAX|                SFO|13457|
|           JFK|                LAX|12016|
|           LAX|                JFK|12015|
|           LAS|                LAX| 9715|
|           LGA|                ORD| 9639|
|           LAX|                LAS| 9594|
|           ORD|                LGA| 9575|
|           SFO|                JFK| 8440|
|           JFK|                SFO| 8437|
|           OGG|                HNL| 8313|
|           HNL|                OGG| 8282|
|           LAX|                ORD| 8256|
|           ATL|                LGA| 8234|
|           LGA|                ATL| 8215|
|           ATL|                MCO| 8202|
|           MCO|                ATL| 8202|
|           SFO|                LAS| 7995|
|           ORD|                LAX| 7941|
|           LAS|                SFO| 7870|
+----------

                                                                                

### Varias agregaciones

In [37]:
df.groupBy('ORIGIN_AIRPORT').agg(
    count('AIR_TIME').alias('tiempo_aire'),
    min('AIR_TIME').alias('min'),
    max('AIR_TIME').alias('max')
).orderBy(desc('tiempo_aire')).show()

[Stage 74:>                                                       (0 + 12) / 12]

+--------------+-----------+---+---+
|ORIGIN_AIRPORT|tiempo_aire|min|max|
+--------------+-----------+---+---+
|           ATL|     343506| 15|614|
|           ORD|     276554| 13|571|
|           DFW|     232647| 11|534|
|           DEN|     193402| 12|493|
|           LAX|     192003| 14|409|
|           PHX|     145552| 19|444|
|           SFO|     145491|  8|389|
|           IAH|     144019| 15|524|
|           LAS|     131937| 25|429|
|           MSP|     111055| 14|537|
|           SEA|     110178| 17|412|
|           MCO|     109532| 25|395|
|           DTW|     106992| 15|341|
|           BOS|     104804| 16|432|
|           CLT|      99052| 17|379|
|           EWR|      98341| 21|683|
|           SLC|      96505| 18|419|
|           LGA|      94834| 19|311|
|           JFK|      91663| 29|690|
|           BWI|      84329| 19|398|
+--------------+-----------+---+---+
only showing top 20 rows



                                                                                

In [38]:
df.groupBy('MONTH').agg(
    count('ARRIVAL_DELAY').alias('conteo_retrasos'),
    avg('DISTANCE').alias('prom_dist'),
).orderBy(desc('conteo_retrasos')).show()



+-----+---------------+-----------------+
|MONTH|conteo_retrasos|        prom_dist|
+-----+---------------+-----------------+
|    7|         514384|841.4772794487611|
|    8|         503956|834.8244276603413|
|    6|         492847|835.6302716626612|
|    3|         492138|816.0553268611494|
|    5|         489641|823.3230588760807|
|   10|         482878|816.4436127652134|
|    4|         479251|817.0060476016745|
|   12|         469717|837.8018926194103|
|   11|         462367|820.2482434846529|
|    9|         462153|815.8487523282274|
|    1|         457013|803.2612794913696|
|    2|         407663| 800.785449834689|
+-----+---------------+-----------------+



                                                                                

### Agregacion con pivote

In [39]:
estudiantes = spark.read.parquet('./data/estudiantes.parquet')
estudiantes.show()

+------+----+----+----------+
|nombre|sexo|peso|graduacion|
+------+----+----+----------+
|  Jose|   M|  80|      2000|
| Hilda|   F|  50|      2000|
|  Juan|   M|  75|      2000|
| Pedro|   M|  76|      2001|
|Katia+|   F|  65|      2001|
+------+----+----+----------+



In [40]:
estudiantes.groupBy('graduacion').pivot('sexo').agg(avg('peso')).show()

+----------+----+----+
|graduacion|   F|   M|
+----------+----+----+
|      2001|65.0|76.0|
|      2000|50.0|77.5|
+----------+----+----+



In [41]:
estudiantes.groupBy('graduacion').pivot('sexo').agg(avg('peso'),min('peso'),max('peso')).show()

+----------+-----------+-----------+-----------+-----------+-----------+-----------+
|graduacion|F_avg(peso)|F_min(peso)|F_max(peso)|M_avg(peso)|M_min(peso)|M_max(peso)|
+----------+-----------+-----------+-----------+-----------+-----------+-----------+
|      2001|       65.0|         65|         65|       76.0|         76|         76|
|      2000|       50.0|         50|         50|       77.5|         75|         80|
+----------+-----------+-----------+-----------+-----------+-----------+-----------+



In [42]:
estudiantes.groupBy('graduacion').pivot('sexo', ['F']).agg(avg('peso'),min('peso'),max('peso')).show()

+----------+-----------+-----------+-----------+
|graduacion|F_avg(peso)|F_min(peso)|F_max(peso)|
+----------+-----------+-----------+-----------+
|      2001|       65.0|         65|         65|
|      2000|       50.0|         50|         50|
+----------+-----------+-----------+-----------+

