## Importing Required Libraries

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

## Initializing a spark session

In [4]:
spark = SparkSession.builder.appName('Testing').getOrCreate()

In [5]:
spark

### Reading data from a csv file

In [6]:
df = spark.read.option('header', 'true').csv('airline-safety.csv', inferSchema = True)
# or
# df = spark.read.csv('airline-safety.csv', header = True, inferSchema = True)

# option('header', 'true')  -  To enable the first row as the column name
# inferSchema = True)       -  If false, every data will be taken as string

In [7]:
type(df)

pyspark.sql.dataframe.DataFrame

In [8]:
df.show(5)

+--------------------+----------------------+---------------+---------------------+----------------+---------------+---------------------+----------------+
|             airline|avail_seat_km_per_week|incidents_85_99|fatal_accidents_85_99|fatalities_85_99|incidents_00_14|fatal_accidents_00_14|fatalities_00_14|
+--------------------+----------------------+---------------+---------------------+----------------+---------------+---------------------+----------------+
|          Aer Lingus|             320906734|              2|                    0|               0|              0|                    0|               0|
|           Aeroflot*|            1197672318|             76|                   14|             128|              6|                    1|              88|
|Aerolineas Argent...|             385803648|              6|                    0|               0|              1|                    0|               0|
|         Aeromexico*|             596871813|              3|   

### Check the Schema of the dataframe

In [9]:
df.printSchema()

root
 |-- airline: string (nullable = true)
 |-- avail_seat_km_per_week: long (nullable = true)
 |-- incidents_85_99: integer (nullable = true)
 |-- fatal_accidents_85_99: integer (nullable = true)
 |-- fatalities_85_99: integer (nullable = true)
 |-- incidents_00_14: integer (nullable = true)
 |-- fatal_accidents_00_14: integer (nullable = true)
 |-- fatalities_00_14: integer (nullable = true)



### Playing with columns and rows

In [10]:
# Select or extract specific columns into a separate dataframe

df.select(['airline', 'fatal_accidents_85_99']).show(5)

type(df.select(['airline', 'fatal_accidents_85_99']))

+--------------------+---------------------+
|             airline|fatal_accidents_85_99|
+--------------------+---------------------+
|          Aer Lingus|                    0|
|           Aeroflot*|                   14|
|Aerolineas Argent...|                    0|
|         Aeromexico*|                    1|
|          Air Canada|                    0|
+--------------------+---------------------+
only showing top 5 rows



pyspark.sql.dataframe.DataFrame

In [11]:
df['airline']

Column<'airline'>

In [12]:
# Check the datatypes of the columns

df.dtypes

[('airline', 'string'),
 ('avail_seat_km_per_week', 'bigint'),
 ('incidents_85_99', 'int'),
 ('fatal_accidents_85_99', 'int'),
 ('fatalities_85_99', 'int'),
 ('incidents_00_14', 'int'),
 ('fatal_accidents_00_14', 'int'),
 ('fatalities_00_14', 'int')]

In [13]:
# Similar to pandas describe

df.describe().show()

+-------+---------------+----------------------+------------------+---------------------+------------------+------------------+---------------------+------------------+
|summary|        airline|avail_seat_km_per_week|   incidents_85_99|fatal_accidents_85_99|  fatalities_85_99|   incidents_00_14|fatal_accidents_00_14|  fatalities_00_14|
+-------+---------------+----------------------+------------------+---------------------+------------------+------------------+---------------------+------------------+
|  count|             56|                    56|                56|                   56|                56|                56|                   56|                56|
|   mean|           NULL|   1.384621304732143E9| 7.178571428571429|   2.1785714285714284|112.41071428571429|             4.125|   0.6607142857142857|55.517857142857146|
| stddev|           NULL|  1.4653168949166625E9|11.035656495456639|    2.861068731385928|146.69111354205404|4.5449772476678225|   0.8586836800228957| 111.3

In [14]:
# Drop a column from the DataFrame

col = df.select('incidents_85_99')

df = df.drop('incidents_85_99')

In [15]:
df.show(5)

+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+
|             airline|avail_seat_km_per_week|fatal_accidents_85_99|fatalities_85_99|incidents_00_14|fatal_accidents_00_14|fatalities_00_14|
+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+
|          Aer Lingus|             320906734|                    0|               0|              0|                    0|               0|
|           Aeroflot*|            1197672318|                   14|             128|              6|                    1|              88|
|Aerolineas Argent...|             385803648|                    0|               0|              1|                    0|               0|
|         Aeromexico*|             596871813|                    1|              64|              5|                    0|               0|
|          Air Canad

In [16]:
col

DataFrame[incidents_85_99: int]

In [17]:
# Add a column to the DataFrame

df = df.withColumn("incidents_85_99", lit(0))

# df['incidents_85_99'] = col['incidents_85_99']

df = df.withColumn("incidents_85_99", col['incidents_85_99'])

df.show(5)

AnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "incidents_85_99" missing from "airline", "avail_seat_km_per_week", "fatal_accidents_85_99", "fatalities_85_99", "incidents_00_14", "fatal_accidents_00_14", "fatalities_00_14", "incidents_85_99" in operator !Project [airline#17, avail_seat_km_per_week#18L, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24, incidents_85_99#19 AS incidents_85_99#768]. Attribute(s) with the same name appear in the operation: "incidents_85_99".
Please check if the right attribute(s) are used.;
!Project [airline#17, avail_seat_km_per_week#18L, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24, incidents_85_99#19 AS incidents_85_99#768]
+- Project [airline#17, avail_seat_km_per_week#18L, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24, 0 AS incidents_85_99#759]
   +- Project [airline#17, avail_seat_km_per_week#18L, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24]
      +- Relation [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] csv


In [18]:
# Renaming the column name
df = df.withColumnRenamed('airline', 'Airline_Name')

df.show(5)

+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|        Airline_Name|avail_seat_km_per_week|fatal_accidents_85_99|fatalities_85_99|incidents_00_14|fatal_accidents_00_14|fatalities_00_14|incidents_85_99|
+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|          Aer Lingus|             320906734|                    0|               0|              0|                    0|               0|              0|
|           Aeroflot*|            1197672318|                   14|             128|              6|                    1|              88|              0|
|Aerolineas Argent...|             385803648|                    0|               0|              1|                    0|               0|              0|
|         Aeromexico*|             596871813|                   

### Data Preprocessing

#### Handling missing values

##### Drop rows that have nulls

In [19]:
# Find the shape of the pySpark DataFrame
df.count(), len(df.columns)

(56, 8)

In [20]:
df.na.drop(how="any", thresh=2).count()

# how --> 'any' - drop all rows that have a null value
# how --> 'all' - drop all rows that have all the values in the row as null
# thresh --> n - drop the row if it has less than n number of non-null values

56

In [21]:
df.na.drop(how="any", subset=['avail_seat_km_per_week', 'fatal_accidents_85_99']).count()

# subset --> drop all rows that have null value in the specified columns

56

##### Filling the missing values

In [22]:
# Replace null values with a specified value

df.na.fill(0, ['fatal_accidents_85_99', 'Airline_Name']).show(5)
#       value to replace null,            columns to replace in

+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|        Airline_Name|avail_seat_km_per_week|fatal_accidents_85_99|fatalities_85_99|incidents_00_14|fatal_accidents_00_14|fatalities_00_14|incidents_85_99|
+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|          Aer Lingus|             320906734|                    0|               0|              0|                    0|               0|              0|
|           Aeroflot*|            1197672318|                   14|             128|              6|                    1|              88|              0|
|Aerolineas Argent...|             385803648|                    0|               0|              1|                    0|               0|              0|
|         Aeromexico*|             596871813|                   

In [23]:
# Replacing the null values with the mean or median of a particular column using Imputer

# from pyspark.ml.feature import Imputer

# imputer = Imputer(
#     inputCols=['fatal_accidents_85_99', 'Airline_Name'],
#     outputCols=
# )


# Do it later

#### Filter Operations

In [34]:
# Airlines with less than 1 incidents between 2000 and 2014

df.filter("incidents_00_14 <= 1").show()
# Or
df.filter(df['incidents_00_14'] <= 1).show()

# This displays all the comlumns of the filtered rows

+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|        Airline_Name|avail_seat_km_per_week|fatal_accidents_85_99|fatalities_85_99|incidents_00_14|fatal_accidents_00_14|fatalities_00_14|incidents_85_99|
+--------------------+----------------------+---------------------+----------------+---------------+---------------------+----------------+---------------+
|          Aer Lingus|             320906734|                    0|               0|              0|                    0|               0|              0|
|Aerolineas Argent...|             385803648|                    0|               0|              1|                    0|               0|              0|
|   Austrian Airlines|             358239823|                    0|               0|              1|                    0|               0|              0|
|             Avianca|             396922563|                   

In [28]:
# Filter operation with Select
# Display only the airline name and the other selected columns
df.filter("incidents_00_14 <= 1").select(['Airline_Name', 'avail_seat_km_per_week']).show()

+--------------------+----------------------+
|        Airline_Name|avail_seat_km_per_week|
+--------------------+----------------------+
|          Aer Lingus|             320906734|
|Aerolineas Argent...|             385803648|
|   Austrian Airlines|             358239823|
|             Avianca|             396922563|
|              Condor|             417982610|
|                COPA|             550491507|
|               El Al|             335448023|
|             Finnair|             506464950|
|   Hawaiian Airlines|             493877795|
|      Japan Airlines|            1574217531|
|                KLM*|            1874561773|
|          Korean Air|            1734522605|
|        LAN Airlines|            1001965891|
|       South African|             651502442|
|                TACA|             259373346|
|  TAP - Air Portugal|             619130754|
|    Vietnam Airlines|             625084918|
|     Virgin Atlantic|            1005248585|
+--------------------+------------

In [45]:
# Multiple filter conditions

# Airlines with less than 2 incidents between 2000 and 2014 and with 1 fatality

df.filter((df['incidents_00_14'] <= 2) & (df['fatalities_00_14'] == 1)
          ).select(['Airline_Name', 'avail_seat_km_per_week', 'fatalities_00_14']).show()

+-------------------+----------------------+----------------+
|       Airline_Name|avail_seat_km_per_week|fatalities_00_14|
+-------------------+----------------------+----------------+
|Philippine Airlines|             413007158|               1|
|       Thai Airways|            1702802250|               1|
+-------------------+----------------------+----------------+



In [47]:
# Inverse condition

df.filter(~(df['incidents_00_14'] <= 2) & (df['fatalities_00_14'] <= 1)
          ).select(['Airline_Name', 'avail_seat_km_per_week', 'fatalities_00_14']).show()

+--------------------+----------------------+----------------+
|        Airline_Name|avail_seat_km_per_week|fatalities_00_14|
+--------------------+----------------------+----------------+
|         Aeromexico*|             596871813|               0|
|            Alitalia|             698012498|               0|
|  All Nippon Airways|            1841234177|               0|
|    British Airways*|            3179760952|               0|
|              Iberia|            1173203126|               0|
|          Lufthansa*|            3426529504|               0|
|             Qantas*|            1917428984|               0|
|     Royal Air Maroc|             295705339|               0|
|       Saudi Arabian|             859673901|               0|
|  Southwest Airlines|            3276525770|               0|
|Sri Lankan / AirL...|             325582976|               0|
|              SWISS*|             792601299|               0|
+--------------------+----------------------+----------

#### GroupBy and Aggregate Function

##### GroupBy Operation

In [51]:
df.groupBy('Airline_Name').count().show()

# Other operations: .sum, .mean

+--------------------+-----+
|        Airline_Name|count|
+--------------------+-----+
|  Ethiopian Airlines|    1|
|               El Al|    1|
|            Alitalia|    1|
| Philippine Airlines|    1|
|           American*|    1|
|    Turkish Airlines|    1|
|United / Continen...|    1|
|          Air France|    1|
|          Lufthansa*|    1|
|      Japan Airlines|    1|
|    British Airways*|    1|
|       South African|    1|
|         Aeromexico*|    1|
|        Thai Airways|    1|
|             Qantas*|    1|
|              Iberia|    1|
|US Airways / Amer...|    1|
|              Condor|    1|
|   Hawaiian Airlines|    1|
|     Virgin Atlantic|    1|
+--------------------+-----+
only showing top 20 rows



In [54]:
df.groupBy('Airline_Name').max().show()

# Displays the maximum of each Airline_Name for each column across different rows

+--------------------+---------------------------+--------------------------+---------------------+--------------------+--------------------------+---------------------+--------------------+
|        Airline_Name|max(avail_seat_km_per_week)|max(fatal_accidents_85_99)|max(fatalities_85_99)|max(incidents_00_14)|max(fatal_accidents_00_14)|max(fatalities_00_14)|max(incidents_85_99)|
+--------------------+---------------------------+--------------------------+---------------------+--------------------+--------------------------+---------------------+--------------------+
|  Ethiopian Airlines|                  488560643|                         5|                  167|                   5|                         2|                   92|                   0|
|               El Al|                  335448023|                         1|                    4|                   1|                         0|                    0|                   0|
|            Alitalia|                  69801

##### Aggregate Operation

In [52]:
# Total incidents between 2000 and 2014
df.agg({'incidents_00_14' : 'sum'}).show()

+--------------------+
|sum(incidents_00_14)|
+--------------------+
|                 231|
+--------------------+



In [55]:
# Total Fatalities between 2000 and 2014
df.agg({'fatalities_00_14' : 'sum'}).show()

+---------------------+
|sum(fatalities_00_14)|
+---------------------+
|                 3109|
+---------------------+

