In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, StringType
import pyspark.sql.functions as f
from src.Extract.Extract import Extract
from src.Load.Load import Load
from src.Transform.Transform import Transform
import great_expectations as gx

In [3]:
df = Extract("Data/raw/dataset.csv","csv",{"inferSchema":"true","header":"true"})

File Extracted from source successfully


In [4]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Flight #: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Registration: string (nullable = true)
 |-- cn/In: string (nullable = true)
 |-- Aboard: integer (nullable = true)
 |-- Fatalities: integer (nullable = true)
 |-- Ground: integer (nullable = true)
 |-- Summary: string (nullable = true)



In [5]:
df.describe

<bound method DataFrame.describe of DataFrame[Date: string, Time: string, Location: string, Operator: string, Flight #: string, Route: string, Type: string, Registration: string, cn/In: string, Aboard: int, Fatalities: int, Ground: int, Summary: string]>

In [6]:
df.show(5,truncate=False)

+----------+-----+----------------------------------+----------------------+--------+-------------+----------------------+------------+-----+------+----------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Date      |Time |Location                          |Operator              |Flight #|Route        |Type                  |Registration|cn/In|Aboard|Fatalities|Ground|Summary                                                                                                                                                        

#### **Setting Up Great_Expectations**
 - This python library will be used to check the quality of the data reisiding in the flat file ingested
 - We will start by setting up a context, which is our entry point to Great Expectations API and different functions
 - Adding a data source, which is the source our data resides in (Spark DF, Pandas DF, Database Table, Flat file, etc..)
 - Then adding a data asset which is the specific part of a data we wish to validate/check the quality of
 - Batch Definition is used for when we only need to retrieve a specific portion of the data inside our data asset we wish to handle (say data that came in a specific time period)
 - Finally the Batch is the data itself we are going to validate/check the quality of.

In [7]:
gxContext = gx.get_context()

  timestamp = datetime.utcnow().replace(tzinfo=tzutc())


In [8]:
gxDataSource = gxContext.data_sources.add_spark("Airplane Crashes Spark Dataframe")

In [9]:
gxDataAsset = gxDataSource.add_dataframe_asset("Airplane Crashes Data Asset")

In [10]:
batchDef = gxDataAsset.add_batch_definition_whole_dataframe("Airplane Crashes Entire Dataframe")

In [11]:
dataBatch = batchDef.get_batch(batch_parameters={"dataframe":df})

#### **Setting up our Expectations**
- An expectation is self explanatory, it is what we expect from the column we are checking (to have no null values, to be unique and so on..)
- We are going to set up two expectation suites to test the quality of the dataframe before and after transformation
- An expectation suite is a bunch of expectations grouped together in order to test the dataframe against it all at once instead of writing each expectation indvidually.
- The same suite will be used to test the raw and processed Data by passing two different batches to it (Raw Batch and Processed Batch)

In [12]:
DateNotNull = gx.expectations.ExpectColumnValuesToNotBeNull(column="Date")
DateCorrectDT = gx.expectations.ExpectColumnValuesToBeOfType(column="Date",type_="DateType")
AboardCorrectDT = gx.expectations.ExpectColumnValuesToBeOfType(column="Aboard",type_="IntegerType")
FatalitiesCorrectDT = gx.expectations.ExpectColumnValuesToBeOfType(column="Fatalities",type_="IntegerType")
LocationNotNull = gx.expectations.ExpectColumnValuesToNotBeNull(column="Location")

In [13]:
ExpectationSuite = gx.ExpectationSuite("Raw Dataframe Expectation Sute")

In [14]:
ExpSuite = gxContext.suites.add(ExpectationSuite)

In [15]:
ExpSuite.add_expectation(DateNotNull)
ExpSuite.add_expectation(DateCorrectDT)
ExpSuite.add_expectation(AboardCorrectDT)
ExpSuite.add_expectation(LocationNotNull)
ExpSuite.add_expectation(FatalitiesCorrectDT)

ExpectColumnValuesToBeOfType(id='260dfe93-b212-432f-8d9d-dd85d8d21162', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, row_condition=None, condition_parser=None, column='Fatalities', mostly=1.0, type_='IntegerType')

In [16]:
#Displaying the information inside the Expectaion Suite
print(ExpSuite)

{
  "name": "Raw Dataframe Expectation Sute",
  "id": "b2d970b9-373f-4b7c-885f-c38ba8769678",
  "expectations": [
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Date"
      },
      "meta": {},
      "id": "d2c89024-cb0b-4e86-a972-4b843adaf4c3"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs": {
        "column": "Date",
        "type_": "DateType"
      },
      "meta": {},
      "id": "f20c9619-1325-480e-94ec-3722caf69440"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs": {
        "column": "Aboard",
        "type_": "IntegerType"
      },
      "meta": {},
      "id": "291af1fe-154f-4274-b1ed-7901ce07a509"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Location"
      },
      "meta": {},
      "id": "1fbf9546-d690-496f-845c-383a1fe5e5bb"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs":

In [17]:
RawDataResult = dataBatch.validate(ExpSuite)

print(ExpSuite)

  body["sentAt"] = datetime.utcnow().replace(tzinfo=tzutc()).isoformat()
Calculating Metrics: 100%|██████████| 17/17 [00:01<00:00, 11.68it/s]

{
  "name": "Raw Dataframe Expectation Sute",
  "id": "b2d970b9-373f-4b7c-885f-c38ba8769678",
  "expectations": [
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Date"
      },
      "meta": {},
      "id": "d2c89024-cb0b-4e86-a972-4b843adaf4c3"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs": {
        "column": "Date",
        "type_": "DateType"
      },
      "meta": {},
      "id": "f20c9619-1325-480e-94ec-3722caf69440"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs": {
        "column": "Aboard",
        "type_": "IntegerType"
      },
      "meta": {},
      "id": "291af1fe-154f-4274-b1ed-7901ce07a509"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Location"
      },
      "meta": {},
      "id": "1fbf9546-d690-496f-845c-383a1fe5e5bb"
    },
    {
      "type": "expect_column_values_to_be_of_type",
      "kwargs":




#### **Handling Missing Data in the Dataset**
- Since the data residing inside our Dataframe is rather historical and it would be incorrect to fill missing values using the mean, median or mode, and since dropping Rows with Null values with a threshold of 3 (3 null values inside the same row), Missing textual data will be replaced with "Not Found", Missing numerical data will be replaced with 0 (apart from Aboard, Fatalities and Ground columns as to not tamper with historical data and lead to inaccurate data)

#### **Handling incorrect Data types**
- Columns of Incorrect DataTypes will be casted into the correct Data Type, refer to the Transform Function (src -> Transform -> Transform.py)

In [18]:
ProcessedDF = Transform(df)

In [19]:
ProcessedDF.printSchema()

root
 |-- ID: long (nullable = false)
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = false)
 |-- Location: string (nullable = false)
 |-- Operator: string (nullable = false)
 |-- Flight #: string (nullable = false)
 |-- Route: string (nullable = false)
 |-- Type: string (nullable = false)
 |-- Registration: string (nullable = false)
 |-- cn/In: string (nullable = false)
 |-- Aboard: integer (nullable = true)
 |-- Fatalities: integer (nullable = true)
 |-- Ground: integer (nullable = true)



In [20]:
ProcessedDF.show(5,truncate=True)

+---+----------+---------+--------------------+--------------------+---------+-------------+--------------------+------------+---------+------+----------+------+
| ID|      Date|     Time|            Location|            Operator| Flight #|        Route|                Type|Registration|    cn/In|Aboard|Fatalities|Ground|
+---+----------+---------+--------------------+--------------------+---------+-------------+--------------------+------------+---------+------+----------+------+
|  0|1908-09-17|    17:18| Fort Myer, Virginia|Military - U.S. Army|Not Found|Demonstration|    Wright Flyer III|   Not Found|        1|     2|         1|     0|
|  1|1912-07-12|    06:30|AtlantiCity, New ...|Military - U.S. Navy|Not Found|  Test flight|           Dirigible|   Not Found|Not Found|     5|         5|     0|
|  2|1913-08-06|Not Found|Victoria, British...|             Private|        -|    Not Found|    Curtiss seaplane|   Not Found|Not Found|     1|         1|     0|
|  3|1913-09-09|    18:30|  

In [21]:
ProcessedDF.write.csv("./Data/processed",mode="overwrite",header=True)

#### **Testing the quality of the Processed Dataset**

In [22]:
processedDataBatch = batchDef.get_batch(batch_parameters={"dataframe":ProcessedDF})

In [23]:
processedDataResult = processedDataBatch.validate(ExpSuite)

print(processedDataResult)

Calculating Metrics: 100%|██████████| 17/17 [00:00<00:00, 27.68it/s]

{
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "Airplane Crashes Spark Dataframe-Airplane Crashes Data Asset",
          "column": "Date"
        },
        "meta": {},
        "id": "d2c89024-cb0b-4e86-a972-4b843adaf4c3"
      },
      "result": {
        "element_count": 5268,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": [],
        "partial_unexpected_counts": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "batch_id": "Airplane Crashes Spark Dataframe-Airplane Crashes Data Asset",
          "column": "Date",
         




#### **Loading Data into the Destination Database**


In [None]:
Load(ProcessedDF,"airplanecrashes","Processed")