### Importing Spark Session

In [1]:
from pyspark.sql import SparkSession

### Creating Spark Session

In [2]:
spark = SparkSession.builder \
    .appName("MyProject") \
    .getOrCreate()


### Initializing a SparkSession with configurations

In [3]:
spark = SparkSession.builder \
    .appName("CSVtoMongoDB") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/flights.cleaned") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

### Reading Data using Spark

In [4]:
file_paths = {
    2009: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2009.csv",
    2010: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2010.csv",
    2011: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2011.csv",
    2012: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2012.csv",
    2013: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2013.csv",
    2014: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2014.csv",
    2015: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2015.csv",
    2016: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2016.csv",
    2017: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2017.csv",
    2018: "C:/Users/msrih/Downloads/UMBC CLASSES/603/Data Sets/2018.csv"
}

dfs = {}  # Dictionary to store DataFrames for each year

for year, file_path in file_paths.items():
    dfs[year] = spark.read.csv(file_path, header=True, inferSchema=True)


In [11]:
dfs[2009].show(1, vertical=True)

-RECORD 0-------------------------
 FL_DATE             | 2009-01-01 
 OP_CARRIER          | XE         
 OP_CARRIER_FL_NUM   | 1204       
 ORIGIN              | DCA        
 DEST                | EWR        
 CRS_DEP_TIME        | 1100       
 DEP_TIME            | 1058.0     
 DEP_DELAY           | -2.0       
 TAXI_OUT            | 18.0       
 WHEELS_OFF          | 1116.0     
 WHEELS_ON           | 1158.0     
 TAXI_IN             | 8.0        
 CRS_ARR_TIME        | 1202       
 ARR_TIME            | 1206.0     
 ARR_DELAY           | 4.0        
 CANCELLED           | 0.0        
 CANCELLATION_CODE   | NULL       
 DIVERTED            | 0.0        
 CRS_ELAPSED_TIME    | 62.0       
 ACTUAL_ELAPSED_TIME | 68.0       
 AIR_TIME            | 42.0       
 DISTANCE            | 199.0      
 CARRIER_DELAY       | NULL       
 WEATHER_DELAY       | NULL       
 NAS_DELAY           | NULL       
 SECURITY_DELAY      | NULL       
 LATE_AIRCRAFT_DELAY | NULL       
 Unnamed: 27        

In [12]:
print("Data type of the loaded dataframes:")
for year, df in dfs.items():
    print(f'dfs[{year}]: {type(df)}')


Data type of the loaded dataframes:
dfs[2009]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2010]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2011]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2012]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2013]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2014]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2015]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2016]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2017]: <class 'pyspark.sql.dataframe.DataFrame'>
dfs[2018]: <class 'pyspark.sql.dataframe.DataFrame'>


In [13]:
from functools import reduce
from pyspark.sql import DataFrame

# List comprehension to extract DataFrame values from the dictionary
dfs_list = [dfs[year] for year in dfs]

# Concatenating all DataFrames into a single DataFrame
df = reduce(DataFrame.unionAll, dfs_list)


In [14]:
#schema of df2008
df.printSchema()  

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: double (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: double (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY: double

In [15]:
df.count()

61556964

In [16]:
len(df.columns)

28

In [17]:
df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ACTUAL_ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'Unnamed: 27']

In [18]:
#To check the number of null values
from pyspark.sql.functions import col, isnan, unix_timestamp, when, count

uniondf2 = df.select([
    count(when(
        col(c).contains('None') | col(c).contains('NULL') | (col(c) == 'NA') | col(c).isNull() | isnan(c),
        c)
    ).alias(c) if c != 'FL_DATE' else
    count(when(
        col(c).contains('None') | col(c).contains('NULL') | (col(c) == 'NA') | col(c).isNull() | isnan(unix_timestamp(c)),
        c)
    ).alias(c)
    for c in df.columns
])

uniondf2.show(vertical=True)

-RECORD 0-----------------------
 FL_DATE             | 0        
 OP_CARRIER          | 0        
 OP_CARRIER_FL_NUM   | 0        
 ORIGIN              | 0        
 DEST                | 0        
 CRS_DEP_TIME        | 1        
 DEP_TIME            | 935723   
 DEP_DELAY           | 940675   
 TAXI_OUT            | 963901   
 WHEELS_OFF          | 963896   
 WHEELS_ON           | 997016   
 TAXI_IN             | 997015   
 CRS_ARR_TIME        | 2        
 ARR_TIME            | 997015   
 ARR_DELAY           | 1121351  
 CANCELLED           | 0        
 CANCELLATION_CODE   | 60583755 
 DIVERTED            | 0        
 CRS_ELAPSED_TIME    | 60       
 ACTUAL_ELAPSED_TIME | 1118754  
 AIR_TIME            | 1118753  
 DISTANCE            | 0        
 CARRIER_DELAY       | 50166224 
 WEATHER_DELAY       | 50166224 
 NAS_DELAY           | 50166224 
 SECURITY_DELAY      | 50166224 
 LATE_AIRCRAFT_DELAY | 50166224 
 Unnamed: 27         | 61556964 

