# Batch process with Spark

In [None]:
!load pyspark/pyjob.py
from pyspark.sql import SparkSession
from operator import add
import sys
import pandas as pd
import json
from pyspark.sql.functions import expr

# Constants
APP_NAME = "Synthesize"


def main(spark):
    df = spark.read.json("s3://delays-project-2/2017/*/*/*/*")
    # df.printSchema()

    df.createGlobalTempView("s3content")

    # airport table
    airports = spark.sql("SELECT DISTINCT IATA, ICAO, city, name, state FROM global_temp.s3content")
    airports.write.parquet("s3://delays-project-parquet/pyspark/airports.parquet")
    airports.coalesce(1).write.csv("s3://delays-project-parquet/pyspark/airports2.csv")

    # weather table
    weather = spark.sql("SELECT DISTINCT IATA, q_time, weather.temp, weather.visibility, weather.weather, weather.wind, weather.meta.credit, weather.meta.updated, weather.meta.url FROM global_temp.s3content")
    weather_formatted = weather.select(expr("(split(q_time, 'T'))[0]").alias("date"), expr("(split(q_time, 'T'))[1]").alias("time"), expr("(split(temp, ' '))[0]").alias("tempF"), expr("IATA"), expr("updated"))
    weather_formatted.coalesce(1).write.csv("s3://delays-project-parquet/pyspark/weather.csv")

    # delays table
    delays = spark.sql("SELECT DISTINCT IATA, q_time, delay, status.avgDelay as avgDelay , status.closureBegin as closureBegin, status.closureEnd as closureEnd, status.endTime as endTime, status.maxDelay as maxDelays, status.minDelay as minDelays, status.reason as reason, status.trend as trend, status.type as type FROM global_temp.s3content")
    delays_formatted = delays.select(expr("(split(q_time, 'T'))[0]").alias("date"), expr("(split(q_time, 'T'))[1]").alias("time"), expr("IATA"), expr("delay"), expr("avgDelay"), expr("closureBegin"), expr("closureEnd"), expr("endTime"), expr("maxDelays"), expr("minDelays"), expr("reason"), expr("trend"), expr("type") )
    delays_formatted.coalesce(1).write.csv("s3://delays-project-parquet/pyspark/delays.csv")

    # weather and dailys in one table
    weather_delays = spark.sql("SELECT DISTINCT IATA, q_time, delay, status.reason as reason, weather.temp as temp, weather.visibility as visibility, weather.weather as weather, weather.wind as wind, weather.meta.credit as credit, weather.meta.updated as updated, weather.meta.url as url FROM global_temp.s3content")
    weather_delays_formatted = weather_delays.select(expr("(split(q_time, 'T'))[0]").alias("date"), expr("(split(q_time, 'T'))[1]").alias("time"), expr("IATA"), expr("delay"), expr("reason"), expr("temp"), expr("visibility"), expr("weather"), expr("wind"), expr("credit"), expr("updated"), expr("url"))
    weather_delays_formatted.coalesce(1).write.csv("s3://delays-project-parquet/pyspark/weather_delays0000.csv")


if __name__ == "__main__":
    # To run the file type: "spark-submit pyjob.py"
    # The output is located at "s3://delays-project-parquet/"
    spark = SparkSession \
    .builder \
    .appName(APP_NAME) \
    .getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    main(spark)

## Loading output into MySql

In [None]:
!load mysqlimport.sh
mysqlimport --local  --compress  --user=flask\
     -P --host=flasktest.czmfd7dvqdgw.us-east-1.rds.amazonaws.com \
     --port=3306  --columns=date,time,iata,delay,reason,temp,visibility,
     weather,wind,credit,updated,url --fields-terminated-by=',' \
     weather_delay.csv

> Cronjobs to run every day

```0 4 * * * ~/pyspark/pyjob.py```

```0 5 * * * bash mysqlimport.sh```

# Front End:

# Amazon beanstalk

### Model definition

In [None]:
class WeatherDelay(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    date = db.Column(db.Date)
    time = db.Column(db.Time)
    iata = db.Column(db.String(4))
    delay = db.Column(db.String(6))
    reason = db.Column(db.String(64))
    temp = db.Column(db.String(64))
    visibility = db.Column(db.String(64))
    weather = db.Column(db.String(64))
    wind = db.Column(db.String(64))
    credit = db.Column(db.String(64))
    updated = db.Column(db.String(64))
    url = db.Column(db.String(64))

    def __init__(self, date, time, iata, delay, reason, temp, visibility, weather,
                 wind, credit, updated, url):
        self.date = date
        self.time = time
        self.iata = iata
        self.delay = delay
        self.reason = reason
        self.temp = temp
        self.visibility = visibility
        self.weather = weather
        self.wind = wind
        self.credit = credit
        self.updated = updated
        self.url = url

### Views Definition

In [None]:
@application.route('/temps/<iata>') 
def temps_by_iata(iata):
    query_db = DailyTemp.query.filter_by(iata=iata).order_by(DailyTemp.id.asc())
    for q in query_db:
        print(q.date)
    db.session.close()

    return render_template('temps.html', results=query_db)

In [None]:
!load flask/templates/delays.html
    <div align="center">
        <h2>Delays recorded in DBs:</h2>
      </div>

<hr>
<div class="col-md-4">
    <table class="table table-striped table-hover">
      <thead>
        <tr>
         <th>Airport</th>
         <th>Date</th>
         <th>Delays</th>
      </tr>
     </thead>
{% for z in results %}
    <tr>
      <td>{{z.iata}}</td>
      <td>{{z.date}}</td>
      <td>  {% if z.delay == 0 %}
        False
         {% else %}
         True
{% endif %}</td>
    </tr>
{% endfor %}
  </tbody>
</table>

```$python db_create.py```

```$eb deploy```

<img src='../images/eb.png'>

[Back home](https://github.com/gth158a/data_eng-airport_delays)