# 2. Stream Consumption & Writing to Cassandra 

## 2.0 Cassandra workflow

In [1]:
import pyspark
import time
import os


os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row


conf = SparkConf() \
    .setAppName("CassandraWriter") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
    
spark = SparkContext(conf=conf)
sqlContext=SQLContext(spark)

data_path = "../Data"

In [3]:
from pyspark.sql import DataFrame
import functools


# Define helper function to concatenate streaming DataFrames
def unionAll(df_list):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), df_list)


dfs = []
year = 2008
for month in range(1, 13):
    csv_path = data_path + "/" + str(year) + "/" + str(month)

#         Use of Spark static DataFrames because aggregation operations (e.g., JOINS) are not supported for streaming datasets 
    dfs.append(sqlContext.read.options(header='True', inferSchema='True', delimiter=',').csv(data_path + "/" + str(year) + "/" + str(month) + "/" + str(month) + ".csv"))
        
df = unionAll(dfs)

In [4]:
df.createOrReplaceTempView("flights_2008")

(Optional) To save data to Cassandra, please run the following cell:

In [5]:
# # Cassandra requires a unique primary key, that we add here + column names must be in lowercase format
# from pyspark.sql.functions import monotonically_increasing_id 


# df_index = df.toDF(*[col.lower() for col in df.columns]).withColumn("id", monotonically_increasing_id())
# df_index.createOrReplaceTempView("flights_2008")
# df_index.write.format("org.apache.spark.sql.cassandra").mode('overwrite').options(table="flights_2008", keyspace="task2").save()

## 3.2 

Tom wants to travel from airport X to airport Z. However, Tom also wants to stop at airport Y for some sightseeing on the way. More concretely, Tom has the following requirements (for specific queries, see the Task 1 Queries and Task 2 Queries):

a) The second leg of the journey (flight Y-Z) must depart two days after the first leg (flight X-Y). For example, if X-Y departs on January 5, 2008, Y-Z must depart on January 7, 2008.

b) Tom wants his flights scheduled to depart airport X before 12:00 PM local time and to depart airport Y after 12:00 PM local time.

c) Tom wants to arrive at each destination with as little delay as possible. You can assume you know the actual delay of each flight.

Your mission (should you choose to accept it!) is to find, for each X-Y-Z and day/month (dd/mm) combination in the year 2008, the two flights (X-Y and Y-Z) that satisfy constraints (a) and (b) and have the best individual performance with respect to constraint (c), if such flights exist.

### First leg

In [7]:
table_3_2_xy = sqlContext.sql("SELECT\
                                  ORIGIN,\
                                  FL_DATE,\
                                  CRS_DEP_TIME,\
                                  OP_UNIQUE_CARRIER,\
                                  OP_CARRIER_FL_NUM,\
                                  DEST,\
                                  ARR_DELAY,\
                                  rank() OVER (PARTITION BY ORIGIN, DEST, YEAR, MONTH ORDER BY ARR_DELAY ASC) AS rank\
                              FROM flights_2008\
                              WHERE CRS_DEP_TIME < 1200 AND CANCELLED = 0")

table_3_2_xy.createOrReplaceTempView("xy")

In [8]:
table_3_2_xy.show()


+------+-------+------------+-----------------+-----------------+----+---------+----+
|ORIGIN|FL_DATE|CRS_DEP_TIME|OP_UNIQUE_CARRIER|OP_CARRIER_FL_NUM|DEST|ARR_DELAY|rank|
+------+-------+------------+-----------------+-----------------+----+---------+----+
|   ABE|8/19/08|         630|               EV|             4171| ATL|      -18|   1|
|   ABE| 8/5/08|         630|               EV|             4171| ATL|      -17|   2|
|   ABE| 8/1/08|         630|               EV|             4171| ATL|      -14|   3|
|   ABE| 8/9/08|         630|               EV|             4171| ATL|      -14|   3|
|   ABE|8/11/08|         630|               EV|             4171| ATL|      -14|   3|
|   ABE|8/29/08|         630|               EV|             4171| ATL|      -14|   3|
|   ABE| 8/3/08|         630|               EV|             4171| ATL|      -13|   7|
|   ABE| 8/6/08|         630|               EV|             4171| ATL|      -12|   8|
|   ABE|8/31/08|         630|               EV|       

### Second leg

In [10]:
table_3_2_yz = sqlContext.sql("SELECT\
                                  ORIGIN,\
                                  FL_DATE,\
                                  CRS_DEP_TIME,\
                                  OP_UNIQUE_CARRIER,\
                                  OP_CARRIER_FL_NUM,\
                                  DEST,\
                                  ARR_DELAY,\
                                  rank() OVER (PARTITION BY ORIGIN, DEST, YEAR, MONTH ORDER BY ARR_DELAY ASC) AS rank\
                              FROM flights_2008\
                              WHERE CRS_DEP_TIME > 1200 AND CANCELLED = 0")

table_3_2_yz.createOrReplaceTempView("yz")

In [11]:
table_3_2_yz.show()

+------+-------+------------+-----------------+-----------------+----+---------+----+
|ORIGIN|FL_DATE|CRS_DEP_TIME|OP_UNIQUE_CARRIER|OP_CARRIER_FL_NUM|DEST|ARR_DELAY|rank|
+------+-------+------------+-----------------+-----------------+----+---------+----+
|   ABE| 8/9/08|        1545|               EV|             4598| ATL|      -24|   1|
|   ABE|8/13/08|        1545|               EV|             4598| ATL|      -12|   2|
|   ABE|8/16/08|        1545|               EV|             4598| ATL|      -11|   3|
|   ABE|8/21/08|        1545|               EV|             4598| ATL|      -11|   3|
|   ABE|8/23/08|        1545|               EV|             4598| ATL|      -10|   5|
|   ABE| 8/6/08|        1545|               EV|             4598| ATL|       -9|   6|
|   ABE|8/11/08|        1545|               EV|             4598| ATL|       -8|   7|
|   ABE|8/19/08|        1545|               EV|             4598| ATL|       -7|   8|
|   ABE|8/29/08|        1545|               EV|       

### Join legs and register to Cassandra

In [12]:
table_3_2 = sqlContext.sql("SELECT\
                              xy.ORIGIN AS x,\
                              xy.DEST AS y,\
                              yz.DEST AS z,\
                              xy.FL_DATE AS xy_fl_date,\
                              xy.CRS_DEP_TIME AS xy_dep_time,\
                              xy.OP_UNIQUE_CARRIER AS carrier_xy,\
                              xy.OP_CARRIER_FL_NUM AS flight_xy,\
                              yz.FL_DATE AS yz_fl_date,\
                              yz.CRS_DEP_TIME AS yz_dep_time,\
                              yz.OP_UNIQUE_CARRIER AS carrier_yz,\
                              yz.OP_CARRIER_FL_NUM AS flight_yz,\
                              xy.ARR_DELAY + yz.ARR_DELAY AS total_delay\
                          FROM xy JOIN yz ON\
                               xy.rank = 1 AND yz.rank = 1 AND xy.DEST = yz.ORIGIN AND date_add(xy.FL_DATE, 2) = yz.FL_DATE")

table_3_2.createOrReplaceTempView("flights_3_2")


(Optional) To save data to Cassandra, please run the following cell:

In [None]:
# table_3_2.write.format("org.apache.spark.sql.cassandra").mode('overwrite').options(table="flights_3_2", keyspace="task2").save()

(Optional) To extract data from Cassandra, please run the following cell:

In [None]:
# query_3_2 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="flights_3_2", keyspace="task2").load()
# query_3_2.createOrReplaceTempView("flights_3_2")

And we run the query:

In [51]:
sqlContext.sql("SELECT x, y, z, xy_fl_date, carrier_xy, flight_xy, yz_fl_date, carrier_yz, flight_yz, total_delay\
                FROM flights_3_2 \
                WHERE (x='BOS' AND y='ATL' AND z='LAX' AND xy_FL_DATE='4/3/08')\
                OR (x='PHX' AND y='JFK' AND z='MSP' AND xy_FL_DATE='9/7/08')\
                OR (x='DFW' AND y='STL' AND z='ORD' AND xy_FL_DATE='1/14/08')\
                OR (x='LAX' AND y='MIA' AND z='LAX' AND xy_FL_DATE='5/16/08')").show()

+---+---+---+----------+----------+---------+----------+----------+---------+-----------+
|  x|  y|  z|xy_fl_date|carrier_xy|flight_xy|yz_fl_date|carrier_yz|flight_yz|total_delay|
+---+---+---+----------+----------+---------+----------+----------+---------+-----------+
|BOS|ATL|LAX|    4/3/08|        FL|      270|    4/5/08|        FL|       40|          5|
|PHX|JFK|MSP|    9/7/08|        B6|      178|    9/9/08|        NW|      609|        -42|
|DFW|STL|ORD|   1/14/08|        AA|     1336|   1/16/08|        AA|     2245|        -19|
|LAX|MIA|LAX|   5/16/08|        AA|      280|   5/18/08|        AA|      456|         -9|
+---+---+---+----------+----------+---------+----------+----------+---------+-----------+

