In [1]:
### Importing required python package
import pandas as pd
import numpy as np
import json          
from pyspark.sql import SparkSession 

### This code is for POC purpose. Dont deploy this in production environment without tuning further and 
### exception handling

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
563,application_1534466702543_2832,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
import time
starttime = time.time()

from time import gmtime, strftime
strftime("%Y-%m-%d %H:%M:%S", gmtime())

'2018-08-29 12:38:43'

In [3]:
## Define Increment Load or Full Load
## Precautions for full load. Manually drop spark_pax_flight_final hive table before running this process.
loadtype = 'incrementalload'
RUN_DATE = '20180630000000'

In [4]:
import datetime

# naive datetime
rundate = datetime.datetime.strptime(RUN_DATE, '%Y%m%d%H%M%S')

print("The entire load will happen upto {}".format(rundate))
print("and this is {}".format(loadtype))

The entire load will happen upto 2018-06-30 00:00:00
and this is incrementalload

In [5]:
### Create spark session 
### This steps required to run from jupyter
spark = SparkSession.builder.appName("Mod SparkSession").getOrCreate()

In [6]:
type(spark)

<class 'pyspark.sql.session.SparkSession'>

In [7]:
### creating dataset to bring mod data.
### Table Name: meals_mod_data_mvp3_filtered
### Data is limited to 100 for testing. Change the query for bringing entire dataset

spark_mod = spark.sql("SELECT q1.doctype, q1.fltKey, q1.channels, jv.value as value FROM meals_mod_data_mvp3_filtered jv LATERAL VIEW JSON_TUPLE(jv.value, 'DocType','FltKey','channels') q1 AS doctype, fltKey, channels where q1.doctype = 'Order' limit 10")

### Cleaning spark_mod data
spark_mod = spark_mod.withColumn('flight_number' , spark_mod.channels.substr(3, 4) ).withColumn('board_point' , spark_mod.channels.substr(8, 3) ).withColumn('flight_boarding_time' , spark_mod.channels.substr(12, 12) )

spark_mod = spark_mod.drop("channels" )

In [None]:
### testing spark_mod data 
# print(spark_mod.count())   ## 1658086  @ 6/17
#spark_mod.show()

In [8]:
type(spark_mod)

<class 'pyspark.sql.dataframe.DataFrame'>

In [None]:
def parsing_json(value):
   
    NoneType = type(None)
    
    ### Target output list of list    
    keep_col_list_1d = ["Category", "OrderType","SeatNumber", "Status" , "PaxKey"]   
    
    dictdump = json.loads(value) 
    
    ### keep_col_list_1d data preparation
    
    list_of_1d = [] 
    
    for k in keep_col_list_1d: 
        if k in dictdump.keys():        
            list_of_1d.append(dictdump[k])
        else:
            list_of_1d.append(np.nan)
    
    return list_of_1d  
    
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType, StringType

# parsing_udf = udf(parsing_json, ArrayType(StringType))
parsing_udf = udf(parsing_json, ArrayType(StringType()))
spark_mod = spark_mod.withColumn("new", parsing_udf(spark_mod.value))

spark_mod = spark_mod.withColumn('Category' , spark_mod.new[0] ).withColumn('OrderType' , spark_mod.new[1] ).withColumn('seatnumber' , spark_mod.new[2] ).withColumn('Status' , spark_mod.new[3] ).withColumn('PaxKey' , spark_mod.new[4] )  

spark_mod = spark_mod.drop("new" )

spark_mod.printSchema() 

In [None]:
def parsemoddata(value):
    
    NoneType = type(None)
    
    ### Target output list of list
    items_col_list = ['Category', 'CategoryCode' , 'Code', 'ItemId', 'ItemName', 'ItemOrder', 'MenuId', 'MenuName', 'Status', 'SubCategory' , 'CategoryId' , 'preference']
    
    dictdump = json.loads(value)    
    
    ### parsing items
    ### items_col_list data preparation
    
    list_of_item = []     
    
    for item in dictdump["Items"]:
        ineritem = []
        for k in items_col_list:
            if k in item.keys():
                if( isinstance(item[k], NoneType) ):
                    ineritem.append(np.nan)
                else:
                    ineritem.append(item[k])
            else:
                ineritem.append(np.nan)
        
        list_of_item.append(ineritem)
    
    return list_of_item  

In [None]:
# parsing_udf = udf(parsing_json, ArrayType(StringType))
parsing_udf = udf(parsemoddata , ArrayType(ArrayType(StringType())))
spark_mod = spark_mod.withColumn("new", parsing_udf(spark_mod.value))

# spark_mod.printSchema() 

#spark_mod.show()

In [None]:
spark_mod.printSchema()

In [None]:
from pyspark.sql.functions import explode

spark_mod = spark_mod.withColumn('new', explode('new'))

spark_mod = spark_mod.withColumn('itemCategory' , spark_mod.new[0] ).withColumn('itemCategoryCode' , spark_mod.new[1] ).withColumn('itemcode' , spark_mod.new[2] ).withColumn('ItemId' , spark_mod.new[3] ).withColumn('ItemName' , spark_mod.new[4] ).withColumn('ItemOrder' , spark_mod.new[5] ).withColumn('MenuId' , spark_mod.new[6] ).withColumn('itemMenuName' , spark_mod.new[7] ).withColumn('itemStatus' , spark_mod.new[8] ).withColumn('itemSubCategory' , spark_mod.new[9] ).withColumn('itemCategoryId' , spark_mod.new[10] ).withColumn('itempreference' , spark_mod.new[11] ) 

spark_mod = spark_mod.drop("new" , "value" )

# spark_mod.printSchema() 

# print(spark_mod.count())  ##1834000 
spark_mod.show()

In [None]:
### Bringing the dataset for flights and pax 
spark_pax_flight = spark.sql("select * from spark_pax_flight_final ")


In [None]:
### testing spark_flight data 
# print(spark_pax_flight.count())
# spark_pax_flight.show()
# spark_pax_flight.printSchema() 

In [None]:
spark_mod_pax_flight_key = [ "flight_number" , "flight_boarding_time" , "board_point" , "seatnumber"]
spark_mod_pax_flight = spark_mod.join(spark_pax_flight, spark_mod_pax_flight_key , "left_outer")

In [None]:
### testing spark_pax_flight data 
# print(spark_mod_pax_flight.count())
# spark_mod_pax_flight.select("ItemId" , "MenuId").show()

In [None]:
### creating dataset to bring items data.
### Table Name: meals_menu_dish_master
### Column is limited for testing. Change the query for bringing all the columns.
### Recomendation: mention list of columns. 

spark_items = spark.sql("select menuid as ItemId, dishcode, dishversion, dishcategory, dishsubcategory, cabinclassname, menucardname, dishname from meals_menu_dish_master")

In [None]:
# date_format( date_format( t1.flt_schd_dep_date  , 'yyyy-MM-dd HH.mm.ss.S'), 'yyyyMMddHHmmss') 
### Bringing the dataset for menu for history flight
# spark_future_menu = spark.sql("select t1.flt_carrier_code, t1.flt_number as flight_number, CONCAT(SUBSTR(t1.flt_schd_dep_date,1,4) , SUBSTR(t1.flt_schd_dep_date,6,2) , SUBSTR(t1.flt_schd_dep_date,9,2),SUBSTR(t1.flt_schd_dep_date,12,2),SUBSTR(t1.flt_schd_dep_date,15,2) ) as flight_boarding_time, t1.flt_schd_dep_stn as board_point, t1.cabin_class_code, t1.service_category_code, t1.meal_service_name, t1.meal_service_description, t1.menu_name, t1.meal_type_code, t2.menu_id, t2.item_id as ItemId , t2.item_code, t2.item_name, t2.category, t2.subcategory, t2.quantity, t2.cmsm_uom_name, t3.dishcode, t3.dishversion, t3.dishcategory, t3.dishsubcategory, t3.cabinclassname, t3.menucardname, t3.dishname, t3.is_m_level_boolean, t3.approved_boolean, t3.ek_sig_dish_boolean, t3.in_use_boolean, t3.cms_status, t3.cms_createdate, t3.effective_from, t3.effective_to, t3.lw_cal_dish_boolean, t3.menu_text_boolean from flight_menu_master t1 inner join ( select * from menu_itemid_dishcode_master where category = 'Main Course' ) as t2 on t1.menu_id = t2.menu_id inner join ( select * from meals_itemid_details_master where dishcategory = 'Main Course' ) as t3 on t2.item_id = t3.item_id where t1.flt_schd_dep_date <= ( select from_unixtime(unix_timestamp(max(rundate),'yyyyMMddHHmmss' )) from loadtable )") 

In [None]:
from pyspark.sql.functions import broadcast
## Add items data in the dataset 
join_mod_flight_pax_items_key = [ "ItemId" ]
#join_mod_flight_pax_items_key =  [ "flight_number" , "flight_boarding_time" , "board_point" , "ItemId"]
join_mod_flight_pax_items = spark_mod_pax_flight.join(broadcast(spark_items), join_mod_flight_pax_items_key , "left_outer")

In [None]:
#print(join_mod_flight_pax_items.count())
# join_mod_flight_pax_items.show()

In [None]:
### Loading the pax and flights data into Hive table
join_mod_flight_pax_items.createOrReplaceTempView("mytempTable")

if loadtype is "fullload" :
    print("loading ek_meals_ops_mod_final table")
    sqlContext.sql("create table ek_meals_ops_mod_final as select * from mytempTable")
else:
    print("Dropping ek_meals_ops_mod_final table")
    sqlContext.sql("drop table ek_meals_ops_mod_final")
    print("loading ek_meals_ops_mod_final table")
    sqlContext.sql("create table ek_meals_ops_mod_final as select * from mytempTable")  

In [None]:
from time import gmtime, strftime
strftime("%Y-%m-%d %H:%M:%S", gmtime())

In [None]:
endtime = time.time()
diff = int ( endtime - starttime ) 
minutes, seconds = diff // 60, diff % 60

print("time taken: {} mins & {} secs ".format(minutes,seconds))