In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder\
                  .appName("analyzing airline data")\
                  .getOrCreate()

In [3]:
from pyspark.sql.types import Row
from datetime import datetime

In [5]:
record = sc.parallelize([Row(id=1,
                             name="Jill",
                             active=True,
                             clubs=['chees','hockey'],
                             subjects={"math":80,"english":56},
                             enrolled=datetime(2014,8,1,14,1,5)),
                         Row(id=2,
                             name="George",
                             active=False,
                             clubs=['chees','soccor'],
                             subjects={"math":60,"english":96},
                             enrolled=datetime(2015,3,21,8,2,5))
                        ])

In [6]:
record_df=record.toDF()
record_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chees, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chees, soccor]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



In [7]:
record_df.createOrReplaceTempView("records")

In [8]:
all_records_df=sqlContext.sql("SELECT * FROM records")
all_records_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chees, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chees, soccor]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



In [9]:
sqlContext.sql("select id,clubs[1],subjects['english'] from records").show()

+---+--------+-----------------+
| id|clubs[1]|subjects[english]|
+---+--------+-----------------+
|  1|  hockey|               56|
|  2|  soccor|               96|
+---+--------+-----------------+



In [10]:
sqlContext.sql("SELECT id, NOT active from records").show()

+---+------------+
| id|(NOT active)|
+---+------------+
|  1|       false|
|  2|        true|
+---+------------+



In [11]:
sqlContext.sql("SELECT *  from records where active").show()

+------+---------------+-------------------+---+----+--------------------+
|active|          clubs|           enrolled| id|name|            subjects|
+------+---------------+-------------------+---+----+--------------------+
|  true|[chees, hockey]|2014-08-01 14:01:05|  1|Jill|[english -> 56, m...|
+------+---------------+-------------------+---+----+--------------------+



In [12]:
sqlContext.sql('SELECT *  from records where subjects["english"] > 90 ').show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
| false|[chees, soccor]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



In [13]:
record_df.createGlobalTempView("global_records")

In [16]:
sqlContext.sql("SELECT * FROM global_temp.global_records").show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[chees, hockey]|2014-08-01 14:01:05|  1|  Jill|[english -> 56, m...|
| false|[chees, soccor]|2015-03-21 08:02:05|  2|George|[english -> 96, m...|
+------+---------------+-------------------+---+------+--------------------+



In [3]:
airlinesPath="/home/demo/airlines.csv"
flightsPath="/home/demo/flights.csv"
airportsPath="/home/demo/airports.csv"

In [4]:
airlines = spark.read\
                .format("csv")\
                .option("header","true")\
                .load(airlinesPath)

In [5]:
airlines.createOrReplaceTempView("airlines")

In [6]:
sqlContext.sql("SELECT * FROM airlines").show()

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
|19036|Golden West Airli...|
|19037|Puerto Rico Intl ...|
|19038|Air America Inc.:...|
|19039|Swift Aire Lines ...|
|19040|American Central ...|
|19041|Valdez Airlines: VEZ|
|19042|Southeast Alaska ...|
|19043|Altair Airlines I...|
|19044|Chitina Air Servi...|
|19045|Marco Island Airw...|
|19046|Caribbean Air Ser...|
|19047|Sundance Airlines...|
|19048|Seair Alaska Airl...|
|19049|Southeast Airline...|
|19050|Alaska Aeronautic...|
+-----+--------------------+
only showing top 20 rows



In [7]:
flights = spark.read\
               .format("csv")\
               .option("header","true")\
               .load(flightsPath)

In [8]:
flights.createOrReplaceTempView("flights")
flights.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [9]:
flights.show(5)

+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|      date|airlines|flight_number|origin|destination|departure|departure_delay|arrival|arrival_delay|air_time|distance|
+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|2014-04-01|   19805|            1|   JFK|        LAX|     0854|          -6.00|   1217|         2.00|  355.00| 2475.00|
|2014-04-01|   19805|            2|   LAX|        JFK|     0944|          14.00|   1736|       -29.00|  269.00| 2475.00|
|2014-04-01|   19805|            3|   JFK|        LAX|     1224|          -6.00|   1614|        39.00|  371.00| 2475.00|
|2014-04-01|   19805|            4|   LAX|        JFK|     1240|          25.00|   2028|       -27.00|  264.00| 2475.00|
|2014-04-01|   19805|            5|   DFW|        HNL|     1300|          -5.00|   1650|        15.00|  510.00| 3784.00|
+----------+--------+-----------

In [10]:
flights.count(),airlines.count()

(476881, 1579)

In [11]:
flights_count=spark.sql("SELECT COUNT(*) FROM flights")
airlines_count=spark.sql("SELECT COUNT(*) FROM airlines")

In [12]:
flights_count,airlines_count

(DataFrame[count(1): bigint], DataFrame[count(1): bigint])

In [13]:
flights_count.collect()[0][0],airlines_count.collect()[0][0]

(476881, 1579)

In [14]:
total_distance_df=spark.sql("SELECT distance FROM flights")\
                       .agg({"distance":"sum"})\
                       .withColumnRenamed("sum(distance)","total_distance")

In [15]:
total_distance_df.show()

+--------------+
|total_distance|
+--------------+
|  3.79052917E8|
+--------------+



In [16]:
all_delays_2012 = spark.sql( "SELECT date , airlines , flight_number ,departure_delay FROM flights where departure_delay > 0 and year(date) =2012")

In [17]:
all_delays_2012.show()

+----+--------+-------------+---------------+
|date|airlines|flight_number|departure_delay|
+----+--------+-------------+---------------+
+----+--------+-------------+---------------+



In [18]:
all_delays_2014 = spark.sql( "SELECT date , airlines , flight_number ,departure_delay FROM flights where departure_delay > 0 and year(date) =2014")

In [19]:
all_delays_2014.show(5)

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-01|   19805|            2|          14.00|
|2014-04-01|   19805|            4|          25.00|
|2014-04-01|   19805|            6|         126.00|
|2014-04-01|   19805|            7|         125.00|
|2014-04-01|   19805|            8|           4.00|
+----------+--------+-------------+---------------+
only showing top 5 rows



In [20]:
all_delays_2014.createOrReplaceTempView("all_delays")

In [21]:
all_delays_2014.orderBy(all_delays_2014.departure_delay.desc()).show(5)

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-18|   20366|         4928|          99.00|
|2014-04-18|   20366|         5819|          99.00|
|2014-04-17|   19393|          661|          99.00|
|2014-04-18|   19977|          560|          99.00|
|2014-04-18|   20409|         1205|          99.00|
+----------+--------+-------------+---------------+
only showing top 5 rows



In [22]:
delay_count = spark.sql("SELECT COUNT(departure_delay) from all_delays")

In [23]:
delay_count.show()

+----------------------+
|count(departure_delay)|
+----------------------+
|                179015|
+----------------------+



In [24]:
delay_count.collect()[0][0]

179015

In [25]:
delay_percent = (delay_count.collect()[0][0]/flights_count.collect()[0][0])*100
delay_percent

37.53871510922012

In [26]:
delay_per_airline = spark.sql(" SELECT airlines, departure_delay FROM flights")\
                         .groupBy("airlines")\
                         .agg({"departure_delay":"avg"})\
                         .withColumnRenamed("avg(departure_delay)","departure_delay")

In [27]:
delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



In [28]:
delay_per_airline.createOrReplaceTempView("delay_per_airline")

In [29]:
delay_per_airline = spark.sql("SELECT * FROM delay_per_airline ORDER BY departure_delay DESC")

In [30]:
delay_per_airline.show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



In [31]:
delay_per_airline = spark.sql ( "SELECT * FROM delay_per_airline "+
                               "JOIN airlines ON delay_per_airline.airlines = airlines.code "+
                               "ORDER BY departure_delay DESC")

In [32]:
delay_per_airline.show(5)

+--------+------------------+-----+--------------------+
|airlines|   departure_delay| Code|         Description|
+--------+------------------+-----+--------------------+
|   19393|13.429567657134724|19393|Southwest Airline...|
|   20366|12.296210112379818|20366|ExpressJet Airlin...|
|   19977| 8.818392620527979|19977|United Air Lines ...|
|   20436| 8.716275167785234|20436|Frontier Airlines...|
|   20409|  8.31110357194785|20409| JetBlue Airways: B6|
+--------+------------------+-----+--------------------+
only showing top 5 rows



In [33]:
products = spark.read\
                .format("csv")\
                .option("header","true")\
                .load("/home/demo/products.csv")

In [34]:
products.show(5)

+----------+--------+-----+
|   product|category|price|
+----------+--------+-----+
|Samsung TX|  Tablet|  999|
|Samsung JX|  Mobile|  799|
|Redmi Note|  Mobile|  399|
|        Mi|  Mobile|  299|
|      iPad|  Tablet|  789|
+----------+--------+-----+
only showing top 5 rows



In [36]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [37]:
windowSpecl = Window.partitionBy(products['category'])\
                    .orderBy(products['price'].desc())

In [40]:
price_rank = (func.rank().over(windowSpecl))

In [41]:
product_rank = products.select ( 
    products['product'],
    products['category'],
    products['price']
).withColumn('rank',func.rank().over(windowSpecl))
product_rank.show()

+----------+--------+-----+----+
|   product|category|price|rank|
+----------+--------+-----+----+
|    iPhone|  Mobile|  999|   1|
|Samsung JX|  Mobile|  799|   2|
|Redmi Note|  Mobile|  399|   3|
|   OnePlus|  Mobile|  356|   4|
|        Mi|  Mobile|  299|   5|
|  Micromax|  Mobile|  249|   6|
|Samsung TX|  Tablet|  999|   1|
|      iPad|  Tablet|  789|   2|
|    Lenovo|  Tablet|  499|   3|
|        Xu|  Tablet|  267|   4|
+----------+--------+-----+----+



In [50]:
windowSpecl2 = Window.partitionBy(products['category'])\
                    .orderBy(products['price'].desc())\
                    .rowsBetween(-1,0)

In [51]:
price_max = (func.max(products['price']).over(windowSpecl2))

In [52]:
 products.select ( 
    products['product'],
    products['category'],
    products['price'],
    price_max.alias("price_max")).show()

+----------+--------+-----+---------+
|   product|category|price|price_max|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      799|
|   OnePlus|  Mobile|  356|      399|
|        Mi|  Mobile|  299|      356|
|  Micromax|  Mobile|  249|      299|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      789|
|        Xu|  Tablet|  267|      499|
+----------+--------+-----+---------+



In [47]:
windowSpecl3 = Window.partitionBy(products['category'])\
                    .orderBy(products['price'].desc())\
                    .rangeBetween(-sys.maxsize,sys.maxsize)

In [49]:
price_difference = (func.max(products['price']).over(windowSpecl3) - products['price'])

In [53]:
products.select ( 
    products['product'],
    products['category'],
    products['price'],
    price_difference.alias("price_difference")).show()

+----------+--------+-----+----------------+
|   product|category|price|price_difference|
+----------+--------+-----+----------------+
|    iPhone|  Mobile|  999|             0.0|
|Samsung JX|  Mobile|  799|           200.0|
|Redmi Note|  Mobile|  399|           600.0|
|   OnePlus|  Mobile|  356|           643.0|
|        Mi|  Mobile|  299|           700.0|
|  Micromax|  Mobile|  249|           750.0|
|Samsung TX|  Tablet|  999|             0.0|
|      iPad|  Tablet|  789|           210.0|
|    Lenovo|  Tablet|  499|           500.0|
|        Xu|  Tablet|  267|           732.0|
+----------+--------+-----+----------------+

