# Sprint 1--Collect and Display Atomic Records

## Set Up Session

In [None]:
# Initialize PySpark
APP_NAME = "Collect and Display Atomic Records"

# If there is no SparkSession, create the environment
try:
  sc and spark
except NameError as e:
  import findspark
  findspark.init()
  import pyspark
  import pyspark.sql

  sc = pyspark.SparkContext()
  spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated.")

## ETL

### Load Raw Dataset in CSV

In [None]:
# Load from storage:
on_time_dataframe = spark.read.format(
    'com.databricks.spark.csv'
).options(    
    header='true',    
    treatEmptyValuesAsNulls='true'
).load(
    '../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.csv'
)

on_time_dataframe.registerTempTable("on_time_performance")

In [None]:
# Extract only needed fields:
on_time_dataframe = spark.sql(
"""
    SELECT  
        Year, 
        Quarter, 
        Month, 
        DayofMonth, 
        DayOfWeek, 
        FlightDate,  
        Carrier, 
        TailNum, 
        FlightNum,  
        Origin, 
        OriginCityName, 
        OriginState,  
        Dest, 
        DestCityName, 
        DestState,  
        DepTime, 
        cast(DepDelay as float), 
        cast(DepDelayMinutes as int),  
        cast(TaxiOut as float), 
        cast(TaxiIn as float),  
        WheelsOff, 
        WheelsOn,  
        ArrTime, 
        cast(ArrDelay as float), 
        cast(ArrDelayMinutes as float),  
        cast(Cancelled as int), 
        cast(Diverted as int),  
        cast(ActualElapsedTime as float), 
        cast(AirTime as float),  
        cast(Flights as int), 
        cast(Distance as float),  
        cast(CarrierDelay as float), 
        cast(WeatherDelay as float),   
        cast(NASDelay as float),  
        cast(SecurityDelay as float),   
        cast(LateAircraftDelay as float),  
        CRSDepTime, 
        CRSArrTime
    FROM
        on_time_performance
"""
)

on_time_dataframe.registerTempTable("on_time_performance")
on_time_dataframe.count()

### Dump as Parquet & JSON Lines to Compress

#### Write

In [None]:
on_time_dataframe.toJSON().saveAsTextFile(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.jsonl.gz",    
    "org.apache.hadoop.io.compress.GzipCodec"    
)

on_time_dataframe.write.parquet(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.parquet"
)

#### Compare File Sizes in Different Formats

In [None]:
%%bash

# On time performance dataset in JSON Lines
ls -sh "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.csv"

In [None]:
%%bash

# On time performance dataset in JSON Lines
ls -sh "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.jsonl.gz"

In [None]:
%%bash

# On time performance dataset in JSON Lines
ls -sh "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.parquet"

#### Read Back

In [None]:
on_time_dataframe = spark.read.json(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.jsonl.gz"
)

on_time_dataframe = spark.read.parquet(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.parquet"    
)

### Inject into MongoDB 

#### Set Up Session

In [None]:
import pymongo
import pymongo_spark

In [None]:
# Important: ACTIVATE Pymongo Spark
pymongo_spark.activate()

# Load dataset:
on_time_dataframe = spark.read.parquet(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.parquet"
)

# Row in RDD has to be converted to dict to avoid https://jira.mongodb.org/browse/HADOOP-276
on_time_dict = on_time_dataframe.rdd.map(
    lambda row: row.asDict()
)

on_time_dict.saveToMongoDB(
    'mongodb://localhost:27017/agile_data_science.on_time_performance'
)

### Inject into ElasticSearch

In [None]:
# Load dataset:
on_time_dataframe = spark.read.parquet(
    "../data/On_Time_On_Time_Performance_2015/On_Time_On_Time_Performance_2015.parquet"
)

# Save the DataFrame to Elasticsearch
on_time_dataframe.write.format(
    "org.elasticsearch.spark.sql"
).option(
    "es.resource","agile_data_science/on_time_performance"
).option(
    "es.batch.size.entries","100"
).mode(
    "overwrite"
).save()