In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("analysis-flights").master("local[2]").getOrCreate()

In [None]:
spark

In [None]:
flight_df = (spark.read
  .format("csv")
  .option("mode", "PERMISSIVE")
  .option("header",True)
  .load("flights.csv"))

In [None]:
flight_df.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [None]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

In [None]:
spark.catalog.listTables()

[]

In [None]:
flight_df.describe().show()

+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|summary|               id|  year|             month|              day|          dep_time|    sched_dep_time|         dep_delay|          arr_time|   sched_arr_time|        arr_delay|carrier|            flight|tailnum|origin|  dest|          air_time|          distance|              hour|            minute|          time_hour|                name|
+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+-----------

In [None]:
# to check for null values
import pyspark.sql.functions as F
flight_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in flight_df.columns]).show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour|name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|  0|   0|    0|  0|    8255|             0|     8255|    8713|             0|     9430|      0|     0|   2512|     0|   0|    9430|       0|   0|     0|        0|   0|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+



In [None]:
flight_df.count(), len(flight_df.columns)

(336776, 21)

In [None]:
flight_df.columns

['id',
 'year',
 'month',
 'day',
 'dep_time',
 'sched_dep_time',
 'dep_delay',
 'arr_time',
 'sched_arr_time',
 'arr_delay',
 'carrier',
 'flight',
 'tailnum',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute',
 'time_hour',
 'name']

In [None]:
flight_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- sched_dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- sched_arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- time_hour: string (nullable = true)
 |-- name: string (nullable = true)



In [None]:
# Summary statistics (like count, mean, stddev, min, max)
flight_df.describe().show()


+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|summary|               id|  year|             month|              day|          dep_time|    sched_dep_time|         dep_delay|          arr_time|   sched_arr_time|        arr_delay|carrier|            flight|tailnum|origin|  dest|          air_time|          distance|              hour|            minute|          time_hour|                name|
+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+-----------

In [None]:

# Extended summary statistics for each column
flight_df.summary().show()


+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+------------------+-------------------+--------------------+
|summary|               id|  year|             month|              day|          dep_time|    sched_dep_time|         dep_delay|          arr_time|   sched_arr_time|        arr_delay|carrier|            flight|tailnum|origin|  dest|          air_time|          distance|              hour|            minute|          time_hour|                name|
+-------+-----------------+------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-------+------------------+-------+------+------+------------------+------------------+------------------+-----------

In [None]:
# count unique values
from pyspark.sql.functions import countDistinct

# Count unique values in specific column(s)
flight_df.select([countDistinct(c).alias(c) for c in flight_df.columns]).show()


+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|    id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour|name|
+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|336776|   1|   12| 31|    1318|          1021|      527|    1411|          1163|      577|     16|  3844|   4043|     3| 105|     509|     214|  20|    60|     6936|  16|
+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+



In [None]:
from pyspark.sql.functions import col, sum

# Count null values in each column
flight_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in flight_df.columns]).show()


+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour|name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|  0|   0|    0|  0|    8255|             0|     8255|    8713|             0|     9430|      0|     0|   2512|     0|   0|    9430|       0|   0|     0|        0|   0|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+



In [None]:
from pyspark.sql.functions import avg
flight_df.select(avg("air_time")).show()

+------------------+
|     avg(air_time)|
+------------------+
|150.68646019807787|
+------------------+



In [None]:
flight_grouped = flight_df.groupBy("carrier").agg({"air_time": "sum"}).show()
flight_grouped

+-------+-------------+
|carrier|sum(air_time)|
+-------+-------------+
|     UA|  1.2237728E7|
|     AA|    6032306.0|
|     EV|    4603614.0|
|     B6|    8170975.0|
|     DL|    8277661.0|
|     OO|       2421.0|
|     F9|     156357.0|
|     YV|      35763.0|
|     US|    1756507.0|
|     MQ|    2282880.0|
|     HA|     213096.0|
|     AS|     230863.0|
|     FL|     321132.0|
|     VX|    1724104.0|
|     WN|    1780402.0|
|     9E|    1500801.0|
+-------+-------------+

