In [21]:
#Import libraries
import findspark
findspark.init()

In [22]:
import pyspark
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType,IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, mean, count, when, isnan, concat_ws

In [23]:
spark = SparkSession.builder.appName('Read CSV File into DataFrame').getOrCreate()


In [24]:
''' This step is to write data into Postgresql database'''
#Connection details

PSQL_SERVERNAME = "localhost"
PSQL_PORTNUMBER = 5432
PSQL_DBNAME = "my_db"
PSQL_USRRNAME = "myuser"
PSQL_PASSWORD = "123"
TABLE_POSTGRES = "flight2008" #table name you want to create in Postgres database

In [26]:
URL = f"jdbc:postgresql://{PSQL_SERVERNAME}/{PSQL_DBNAME}"

In [27]:
# Extract data:
def extract_data(spark):
    df = spark.read.csv('D:/DE Mentoring/data/flights_2008_7M.csv', sep=',',header = True, inferSchema= True)
    return df

In [28]:
df = spark.read.csv('D:/DE Mentoring/data/flights_2008_7M.csv', sep=',',header = True, inferSchema= True)


In [15]:
df.head(n=1)

[Row(year=2008, month=1, dayofmonth=3, dayofweek=4, deptime='2003', crsdeptime=1955, arrtime='2211', crsarrtime=2225, uniquecarrier='WN', flightnum=335, tailnum='N712SW', actualelapsedtime='128', crselapsedtime='150', airtime='116', arrdelay='-14', depdelay='8', origin='IAD', dest='TPA', distance=810, taxiin='4', taxiout='8', cancelled=0, cancellationcode=None, diverted=0, carrierdelay='NA', weatherdelay='NA', nasdelay='NA', securitydelay='NA', lateaircraftdelay='NA', dep_timestamp='2008-01-03 20:03:00', arr_timestamp='2008-01-03 22:11:00', carrier_name='Southwest Airlines', plane_type='Corporation', plane_manufacturer='BOEING', plane_issue_date='07/08/1998', plane_model='737-7H4', plane_status='Valid', plane_aircraft_type='Fixed Wing Multi-Engine', plane_engine_type='Turbo-Fan', plane_year='1998', origin_name='Washington Dulles International', origin_city='Chantilly', origin_state='VA', origin_country='USA', origin_lat=38.94453194, origin_lon=-77.45580972, dest_name='Tampa Internation

In [29]:
 # extracting number of rows from the Dataframe
row = df.count()
   
  # extracting number of columns from the Dataframe
col = len(df.columns)
 
  # printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')


Dimension of the Dataframe is: (7009728, 56)
Number of Rows are: 7009728
Number of Columns are: 56


In [30]:
print(df.columns)

['year', 'month', 'dayofmonth', 'dayofweek', 'deptime', 'crsdeptime', 'arrtime', 'crsarrtime', 'uniquecarrier', 'flightnum', 'tailnum', 'actualelapsedtime', 'crselapsedtime', 'airtime', 'arrdelay', 'depdelay', 'origin', 'dest', 'distance', 'taxiin', 'taxiout', 'cancelled', 'cancellationcode', 'diverted', 'carrierdelay', 'weatherdelay', 'nasdelay', 'securitydelay', 'lateaircraftdelay', 'dep_timestamp', 'arr_timestamp', 'carrier_name', 'plane_type', 'plane_manufacturer', 'plane_issue_date', 'plane_model', 'plane_status', 'plane_aircraft_type', 'plane_engine_type', 'plane_year', 'origin_name', 'origin_city', 'origin_state', 'origin_country', 'origin_lat', 'origin_lon', 'dest_name', 'dest_city', 'dest_state', 'dest_country', 'dest_lat', 'dest_lon', 'origin_merc_x', 'origin_merc_y', 'dest_merc_x', 'dest_merc_y']


In [19]:
df.select('origin_country').distinct().show()

+--------------+
|origin_country|
+--------------+
|           USA|
+--------------+



In [20]:
df.select('dest_country').distinct().show()

+------------+
|dest_country|
+------------+
|         USA|
+------------+



In [31]:
df.select('year').distinct().show()

+----+
|year|
+----+
|2008|
+----+



In [None]:
# Because origin_country and dest_country generate only 1 value: USA, it is concluded that
# this dataset contains data of domestic flights within USA in 2008

In [None]:
def transform_data(df):
    # Define Schema
    df_write = df.select('year','month','dayofmonth','origin','dest','distance','carrier_name','airtime','depdelay','origin_city','dest_city','cancelled')
    
    df_write = df_write.withColumn("airtime",df_write["airtime"].cast(FloatType()))\
        .withColumn("depdelay",df_write["depdelay"].cast(IntegerType()))
    df_write.printSchema()
    
    '''
    # Check null, nan
    df_write.select([count(when(col(c).contains('Null') | \
                                (col(c) == '' ) | \
                                col(c).isNull() | \
                                isnan(c), c 
                               )).alias(c)
                        for c in df_write.columns]).show()
    '''

    # Transform missing data
    mean_airtime = df_write.select(round(mean(df_write['airtime']),2)).collect()[0][0]
    
    df_write = df_write.fillna(value=mean_airtime, subset=['airtime'])\
        .fillna(value=0, subset=['depdelay'])
    
    # Merge year, month, dayofmonth columns into date column, and remove year, month, dayofmonth columns
    df_write = df_write.withColumn("date",concat_ws("-",col("year"),col("month"),col("dayofmonth")).cast("date"))\
        .drop('year', 'month','dayofmonth')\
        .select('origin','dest','date','distance','carrier_name','airtime','depdelay','origin_city','dest_city','cancelled')
    df_write.show(20)
    
    return(df_write)


In [8]:
# Loading data:
def load_data(df_write):
    df_write.write\
      .format("jdbc")\
      .option("url", URL)\
      .option("dbtable", TABLE_POSTGRES)\
      .option("user", PSQL_USRRNAME)\
      .option("password", PSQL_PASSWORD)\
      .save()

In [9]:
if __name__ == "__main__":
    extract_data(spark)
    extract = extract_data(spark)
    transform = transform_data(extract)
    load_data(transform)

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- airtime: float (nullable = true)
 |-- depdelay: integer (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- cancelled: integer (nullable = true)

+----+-----+----------+------+----+--------+------------+-------+--------+-----------+---------+---------+
|year|month|dayofmonth|origin|dest|distance|carrier_name|airtime|depdelay|origin_city|dest_city|cancelled|
+----+-----+----------+------+----+--------+------------+-------+--------+-----------+---------+---------+
|   0|    0|         0|     0|   0|       0|           0| 154699|  136246|          0|        0|        0|
+----+-----+----------+------+----+--------+------------+-------+--------+----------