Created this notebook to start analyzing and cleaning the data

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!ls
!pip install pyspark


drive  sample_data
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ff07dd6c4a7c00471f36b894d7f08767e300e687d443cf5ff9f7cce3a69d8f3a
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [4]:
from pyspark.sql import SparkSession
MAX_MEMORY = "45g"


spark = SparkSession \
    .builder \
    .appName("airline-delay-1") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()
# spark = SparkSession.builder \
#     .master('local[*]') \
#     .config("spark.driver.memory", "15g") \
#     .appName('my-cool-app') \
#     .getOrCreate()

In [5]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType



In [6]:
path ='drive/MyDrive/data/raw/'

Read the file from Storage

In [7]:
# read the data
dfx = spark.read.options(header='True', inferSchema='True',mode='DROPMALFORMED').csv(path+'2018.csv')
# Displays the content of the DataFrame to stdout
dfx.show(10)


+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [8]:
df = dfx

##Data Cleaning Steps

### Filter only the required columns for our analysis 

In [9]:
columns = ["FL_DATE","OP_CARRIER","OP_CARRIER_FL_NUM",
           "ORIGIN","DEST","CRS_DEP_TIME","DEP_TIME","DEP_DELAY",'WEATHER_DELAY','SECURITY_DELAY','LATE_AIRCRAFT_DELAY',
           "CRS_ARR_TIME","ARR_TIME","ARR_DELAY","CANCELLED",'CARRIER_DELAY',]

df = df.select(columns)

### Remove the flights' rows that were cancelled

In [10]:
df = df.filter(df['CANCELLED'] != 1.0)
df = df.drop('CANCELLED')



###Remove any column that may have Null values for departure and arrival time

In [11]:
df = df.na.drop(subset=["DEP_TIME","ARR_TIME"])


### Convert Null values in delays to zeroes

In [12]:
df = df.fillna({'WEATHER_DELAY': '0.0'})
df = df.fillna({'SECURITY_DELAY': '0.0'})
df = df.fillna({'LATE_AIRCRAFT_DELAY': '0.0'})
df = df.fillna({'CARRIER_DELAY': '0.0'})

### Change time format to readable form

In [13]:
# UDF code to convert separate hours and minutes of time values 
def separateTimeStampValues(time):
  t = str(time)
  try :
    dot_index = t.index('.')
    t = t[:dot_index]
  except :
    pass

  if len(t) < 4 :
    t = '0' + t
  return t[:2] + ':' + t[2:] + ":00"

# Define the method as a UDF
udfSeparateTimeStampValues = F.udf(separateTimeStampValues, StringType())
df_X = df
# Create a new column using your UDF
df_X = df_X.withColumn('CRS_DEP_TIME_x', udfSeparateTimeStampValues(df.CRS_DEP_TIME))
df_X = df_X.withColumn('DEP_TIME_X', udfSeparateTimeStampValues(df.DEP_TIME))
df_X = df_X.withColumn('CRS_ARR_TIME_X', udfSeparateTimeStampValues(df.CRS_ARR_TIME))
df_X = df_X.withColumn('ARR_TIME_X', udfSeparateTimeStampValues(df.ARR_TIME))

df_X = df_X.drop('CRS_DEP_TIME').drop('DEP_TIME').drop('CRS_ARR_TIME').drop('ARR_TIME')


df_X = df_X.withColumnRenamed("CRS_DEP_TIME_x","CRS_DEP_TIME")
df_X = df_X.withColumnRenamed("DEP_TIME_X","DEP_TIME")
df_X = df_X.withColumnRenamed("CRS_ARR_TIME_X","CRS_ARR_TIME")
df_X = df_X.withColumnRenamed("ARR_TIME_X","ARR_TIME")


df = df_X

In [14]:
## Data Preprocessing 

### Apply Encoding to three columns: Flight Career (OP_CARREER), Origin (ORG) and Destination (DEST)

In [15]:
from pyspark.ml.feature import StringIndexer

indexer_op = StringIndexer(inputCol="OP_CARRIER", outputCol="OP_CARRIER_INDEXED") 
df = indexer_op.fit(df).transform(df) 

indexer_op = StringIndexer(inputCol="ORIGIN", outputCol="ORIGIN_INDEXED") 
df = indexer_op.fit(df).transform(df) 

indexer_op = StringIndexer(inputCol="DEST", outputCol="DEST_INDEXED") 
df = indexer_op.fit(df).transform(df) 

df.show(10)

+----------+----------+-----------------+------+----+---------+-------------+--------------+-------------------+---------+-------------+------------+--------+------------+--------+------------------+--------------+------------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|DEP_DELAY|WEATHER_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|ARR_DELAY|CARRIER_DELAY|CRS_DEP_TIME|DEP_TIME|CRS_ARR_TIME|ARR_TIME|OP_CARRIER_INDEXED|ORIGIN_INDEXED|DEST_INDEXED|
+----------+----------+-----------------+------+----+---------+-------------+--------------+-------------------+---------+-------------+------------+--------+------------+--------+------------------+--------------+------------+
|2018-01-01|        UA|             2429|   EWR| DEN|     -5.0|          0.0|           0.0|                0.0|    -23.0|          0.0|    15:17:00|15:12:00|    17:45:00|17:22:00|               4.0|          15.0|         3.0|
|2018-01-01|        UA|             2427|   LAS| SFO|     -8.0|          0.0|           

Save file to drive 

In [17]:
hdfs_path_for_multiple_files = 'drive/MyDrive/data/clean/hdfs_2018_m_v3'
hdfs_path_for_single_files = 'drive/MyDrive/data/clean/hdfs_2018_s_v3'

df.write.format("csv").option("header", "true").save(hdfs_path_for_multiple_files)
dataframe = spark.read.option('header', 'true').csv(hdfs_path_for_multiple_files)

dataframe.coalesce(1).write.format('csv').option('header', 'true').save(hdfs_path_for_single_files)