In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions  import *
from pytz import timezone
import pandas as pd
import pyarrow
import datetime
import logging
from datetime import date
import json


In [0]:
%sh 
pip install --upgrade pip
pip install --upgrade snowflake-sqlalchemy
pip install jsonpickle

In [0]:
from sqlalchemy import create_engine
import jsonpickle
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Date
from sqlalchemy import DateTime
from sqlalchemy import Time
from sqlalchemy import inspect

In [0]:
%run /arunchandra.kathula@dxc.com/PHDATA/Logging

In [0]:
custom_logfile_Name ='travel_customlog'
loggerAtt, p_logfile, file_date = logger(custom_logfile_Name, '/tmp/')

In [0]:
%sql
SET TIME ZONE 'America/Indianapolis';

key,value
spark.sql.session.timeZone,America/Indianapolis


key,value
spark.sql.session.timeZone,America/Indianapolis


## Error Class

In [0]:
class ErrorReturn:
  def __init__(self, status, errorMessage, functionName):
    self.status = status
    self.errorMessage = str(errorMessage)
    self.functionName = functionName
    self.time = datetime.datetime.now(timezone("America/Indianapolis")).isoformat()
  def exit(self):
    dbutils.notebook.exit(json.dumps(self.__dict__))

## Checks

In [0]:
Checks = {}
def Check(**kwargs):
  for key, value in kwargs.items():
      #loggerAtt.info("The value of {} is {}".format(key, value))
      Checks[key] = value

In [0]:
def Merge(dict1, dict2):
    res = {**dict1, **dict2}
    return res

## Widgets for passing Parameters

In [0]:
#dbutils.widgets.removeAll()
dbutils.widgets.text("fileName","")
dbutils.widgets.text("user","")
dbutils.widgets.text("password","")
dbutils.widgets.text("account","")
dbutils.widgets.text("database_name","")
dbutils.widgets.text("schema_name","")
dbutils.widgets.text("warehouse_name","")


fileName=dbutils.widgets.get("fileName")
user=dbutils.widgets.get("user")
password=dbutils.widgets.get("password")
account=dbutils.widgets.get("account")
database_name=dbutils.widgets.get("database_name")
schema_name=dbutils.widgets.get("schema_name")
warehouse_name=dbutils.widgets.get("warehouse_name")


loggerAtt.info("Parameters from Widgets retrieved")



## Snowflake Connection

In [0]:
try:
  engine = create_engine(
      'snowflake://{user}:{password}@{account}/{database_name}/{schema_name}?warehouse={warehouse_name}'.format(
      user=user,
      password=password,
      account=account,
      database_name=database_name,
      schema_name=schema_name,
      warehouse_name=warehouse_name)
    #snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>
      )
  conn = engine.connect()
  Check(SnowflakeConnection = 1)
except Exception as ex:
  Check(SnowflakeConnection = 0)
  loggerAtt.error(ex)
  err = ErrorReturn('Error', ex,'Connecting to Snowflake using sqlAlchemy')
  errJson = jsonpickle.encode(err)
  errJson = json.loads(errJson)
  dbutils.notebook.exit(Merge(Checks,errJson))

## Read File

In [0]:
def readFile(file_location, infer_schema, first_row_is_header, delimiter,schema, file_type):
  raw_df = spark.read.format(file_type) \
    .option("mode","PERMISSIVE") \
    .option("header", first_row_is_header) \
    .option("dateFormat", "yyyyMMdd") \
    .option("sep", delimiter) \
    .schema(schema) \
    .load(file_location)
  return raw_df

## Create Table on Snowflake

In [0]:
def CreateTable():
  metadata = MetaData()
  
  loggerAtt.info('Creating FLIGHTS table on Snowflake') 
  flights = Table('flights', metadata, 
  Column('year', Integer, nullable=True),
  Column('month', Integer, nullable=True),
  Column('day', Integer, nullable=True),
  Column('day_of_week', Integer, nullable=True),
  Column('airline', String(50), nullable=False),
  Column('flight_Numeric', String(50), primary_key=True),
  Column('tail_Numeric', String(50), nullable=True),
  Column('origin_airport', String(50), nullable=True),            
  Column('destination_airport', String(50), nullable=False),
  Column('scheduled_departure', String(20), nullable=True),
  Column('departure_time', String(20), nullable=True), #Provided as Numeric --> Changing to TIME
  Column('departure_delay', Integer, nullable=True), #ForeignKey("user.user_id"), nullable=False           
  Column('taxi_out', Integer, nullable=True),
  Column('wheels_off', String(50), nullable=True),
  Column('scheduled_time', Integer, nullable=True), #Provided as Numeric --> Changing to TIME
  Column('elapsed_time', Integer, nullable=True),             
  Column('air_time', Integer, nullable=True), 
  Column('distance', Integer, nullable=True),            
  Column('wheels_on', String(50), nullable=True), #Wheels off is String then Wheels on should be String
  Column('taxi_in', Integer, nullable=True),
  Column('scheduled_arrival', Integer, nullable=True), #Provided as Numeric --> Changing to TIME
  Column('arrical_time', String(50), nullable=True),
  Column('arrical_delay', Integer, nullable=True),             
  Column('diverted', Integer, nullable=True),
  Column('cancelled', Integer, nullable=True),             
  Column('cancellation_reason', String(500), nullable=True),
  Column('air_system_delay', Integer, nullable=True),             
  Column('security_delay', Integer, nullable=True),
  Column('airline_delay', Integer, nullable=True),
  Column('late_aircraft_delay', Integer, nullable=True),          
  Column('weather_delay', Integer, nullable=True))
  
  loggerAtt.info('Creating AIRPORTS table on Snowflake') 
  airports = Table('airports', metadata,
  Column('iata_code', String(10), primary_key=True),
  Column('airport', String(50), nullable=True),
  Column('city', String(50), nullable=True),
  Column('state', String(50), nullable=True),
  Column('country', String(50), nullable=True),
  Column('latitude', Numeric(20), nullable=True),
  Column('longitude', Numeric(20), nullable=True),
  )

  loggerAtt.info('Creating AIRLINES table on Snowflake') 
  airlines = Table('airlines', metadata,
  Column('iata_code', String(10), primary_key=True),
  Column('airline', String(50), nullable=False)    
  )
  
  metadata.create_all(engine)

### Schema Definition

In [0]:
flights_schema = StructType([
                    StructField("YEAR",IntegerType(),False),
                    StructField("MONTH",IntegerType(),False),
                    StructField("DAY",IntegerType(),False),
                    StructField("DAY_OF_WEEK",IntegerType(),False),
                    StructField("AIRLINE",StringType(),False),
                    StructField("FLIGHT_NUMBER",StringType(),False),
                    StructField("TAIL_NUMBER",StringType(),False),
                    StructField("ORIGIN_AIRPORT",StringType(),False),
                    StructField("DESTINATION_AIRPORT",StringType(),False),
                    StructField("SCHEDULED_DEPARTURE",StringType(),False),
                    StructField("DEPARTURE_TIME",StringType(),False),
                    StructField("DEPARTURE_DELAY",IntegerType(),False),
                    StructField("TAXI_OUT",IntegerType(),False),
                    StructField("WHEELS_OFF",StringType(),False),
                    StructField("SCHEDULED_TIME",IntegerType(),False),
                    StructField("ELAPSED_TIME",IntegerType(),False),
                    StructField("AIR_TIME",IntegerType(),False),
                    StructField("DISTANCE",IntegerType(),False),
                    StructField("WHEELS_ON",StringType(),False),
                    StructField("TAXI_IN",IntegerType(),False),
                    StructField("SCHEDULED_ARRIVAL",IntegerType(),False),
                    StructField("ARRIVAL_TIME",StringType(),False),
                    StructField("ARRIVAL_DELAY",StringType(),False),
                    StructField("DIVERTED",IntegerType(),False),
                    StructField("CANCELLED",IntegerType(),False),
                    StructField("CANCELLATION_REASON",StringType(),False),
                    StructField("AIR_SYSTEM_DELAY",IntegerType(),False),
                    StructField("SECURITY_DELAY",IntegerType(),False),
                    StructField("AIRLINE_DELAY",StringType(),False),
                    StructField("LATE_AIRCRAFT_DELAY",IntegerType(),False),
                    StructField("WEATHER_DELAY",IntegerType(),False)
 ])

airports_schema =   StructType([StructField("IATA_CODE",StringType(),False),
                    StructField("AIRPORT",StringType(),False),
                    StructField("CITY",StringType(),False),
                    StructField("STATE",StringType(),False),
                    StructField("COUNTRY",StringType(),False),
                    StructField("LATITUDE",IntegerType(),False),
                    StructField("LONGITUDE",IntegerType(),False)
 ])

airlines_schema =  StructType([StructField("IATA_CODE",StringType(),False),
                    StructField("AIRLINE",StringType(),False)
 ])


## Declarations

In [0]:
file_type = "csv"
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

In [0]:
#airports_location = "/FileStore/tables/airports.csv"
#airlines_location = "/FileStore/tables/airlines.csv"
#flights_location = "/FileStore/tables/flights/"

## Main

In [0]:
def fileLocation(): 
  global file_location
  if fileName.find("airports") != -1: 
    file_location = "/FileStore/tables/airports.csv"
    loggerAtt.info('Processing AIRPORTS dataset')
  elif fileName.find("airlines") != -1:
    file_location = "/FileStore/tables/airlines.csv"
    loggerAtt.info('Processing AIRLINES dataset')
  elif fileName.find("flights") != -1:
    file_location = "/FileStore/tables/flights/" 
    loggerAtt.info('Processing FLIGHTS dataset')
  else:
    loggerAtt.info('Nothing to Process')
  return file_location

In [0]:
def inspectingTables():
  inspector = inspect(engine)
  for _t in inspector.get_table_names():
    loggerAtt.info(f'Present {_t} on Database')
    if _t not in ("flights", "airports", "airlines"):
      loggerAtt.info(f'Missing table {_t} on Database')
      CreateTable()
      loggerAtt.info(f'Creating Missing table {_t} on Database')

In [0]:
if __name__ == "__main__":
  loggerAtt.info('======== Initiating file Movement to Snowflake from Databricks ========')
  try:
    if fileName.find("airports") != -1: 
      fileLocation()
      loggerAtt.info(f'File Location {file_location}')
      df = readFile(file_location, infer_schema, first_row_is_header, delimiter, airports_schema, file_type)
      df=df.toPandas()
      loggerAtt.info(f'Count of records in AIRPORTS dataframe: {df.shape}')     
      df.to_sql('AIRPORTS',con=engine, if_exists='append', chunksize= 15000, index=False)
      Check(Airports = 1)
  except Exception as ex:
    Check(Airports = 0)
    inspector = inspect(engine)
    loggerAtt.error(ex)
    err = ErrorReturn('Error', ex,'Issue with Ingesting AIRPORTS data to Snowflake')
    errJson = jsonpickle.encode(err)
    errJson = json.loads(errJson)
    dbutils.notebook.exit(Merge(Checks,errJson))
    
  try:
    if fileName.find("airlines") != -1: 
      fileLocation()
      loggerAtt.info(f'File Location {file_location}')
      df = readFile(file_location, infer_schema, first_row_is_header, delimiter, airlines_schema, file_type)
      df=df.toPandas()
      loggerAtt.info(f'Count of records in AIRLINES dataframe: {df.shape}')     
      df.to_sql('AIRLINES',con=engine, if_exists='append', chunksize= 15000, index=False)
      Check(Airlines = 1)
  except Exception as ex:
    Check(Airlines = 0)
    inspector = inspect(engine)
    loggerAtt.error(ex)
    err = ErrorReturn('Error', ex,'Issue with Ingesting AIRLINES data to Snowflake')
    errJson = jsonpickle.encode(err)
    errJson = json.loads(errJson)
    dbutils.notebook.exit(Merge(Checks,errJson))
    
    
  try:
    if fileName.find("flights") != -1: 
      fileLocation()
      loggerAtt.info(f'File Location {file_location}')
      df = readFile(file_location, infer_schema, first_row_is_header, delimiter, flights_schema, file_type)
      df=df.toPandas()
      loggerAtt.info(f'Count of records in FLIGHTS dataframe: {df.shape}')     
      df.head(500000).to_sql('FLIGHTS',con=engine, if_exists='append', chunksize= 15000, index=False)
      Check(Flights = 1)
  except Exception as ex:
    Check(Flights = 0)
    inspector = inspect(engine)
    loggerAtt.error(ex)
    err = ErrorReturn('Error', ex,'Issue with Ingesting FLIGHTS data to Snowflake')
    errJson = jsonpickle.encode(err)
    errJson = json.loads(errJson)
    dbutils.notebook.exit(Merge(Checks,errJson))
    
loggerAtt.info('======== Processing Completed ========')    