In [0]:
# basic libraries 
from ftplib import FTP
import os
import py7zr
from delta.tables import *
from pyspark.sql import functions as psf
import icecream
import pandas as pd 
from pyspark.sql.types import *

In [0]:
### ftp parameters 
ftpServerUrl = 'ftpupload.net'
ftpServerPort=21
ftpPath='/htdocs/nhsgp/'
ftpUserName ='topsecret***'
ftpPassword ='topsecret***'

In [0]:
#get ftp file list 
localPath = '/dbfs/'
try: 
  ftp= FTP(ftpServerUrl)
  ftp.login(user=ftpUserName, passwd=ftpPassword)
  ftp.cwd(ftpPath)
  
  files = [ f for f in ftp.nlst() if f.endswith('.7z') or f.endswith('.csv')or f.endswith('.json') or 'PDPI BNFT.7z' in f  or 'BNF Snomed Mapping data' in f] # f.endswith('.7z') #testing: f=='T201606PDPI+BNFT.7z' 
   
  print('downloading:')   
  
  for f in files:
    print(f)
    localfile = open(localPath+f, "wb") #open(f,'wb')
    ftp.retrbinary('RETR '+f, localfile.write,1024)
    localfile.close() 
    
except :
  print("FTP Error: ")
ftp.quit() 


In [0]:
# extract single 7z files 
def extract7zFile(filePathAndName, fileRemoved):  
  if(filePathAndName.endswith('.7z')):
    archive = py7zr.SevenZipFile(filePathAndName, mode='r')
    archive.extractall(path='/dbfs/')
    archive.close()
    if (fileRemoved):
      os.remove(filePathAndName)
  else: 
    print('Not a 7Z file!')
    
    
    
def extract7zMultiVolumn(filePath, fileName, fileRemoved):  
  mulitVolumnFiles = sorted([ f for f in os.listdir(filePath) if fileName in f])

  tempFilePath = filePath +'tempAll.7z'
  with open(tempFilePath, 'ab') as outfile:  # append in binary mode
    for fname in mulitVolumnFiles:
      with open(filePath+fname, 'rb') as infile:        # open in binary mode also
#         print(filePath+fname)
        outfile.write(infile.read())
  extract7zFile(tempFilePath, fileRemoved)
  #remove multivolumn files 
  if(fileRemoved):
    for f in mulitVolumnFiles:
      os.remove(filePath+f)
      
      
#convert xlsx to csv due to poor performance of spark-excel 
def convertXlsxToCsv(filepath, filename):
  tempInputPath= os.path.join(filepath, filename)
  tmpXpd =  pd.read_excel(tempInputPath, sheet_name='November 20',engine='openpyxl')
  tmpFilename =  str.replace(filename, '.xlsx','')
  tmpOutxpath = localPath + tmpFilename +'.csv'
  os.remove(tempInputPath)
  tmpXpd.to_csv(tmpOutxpath)


In [0]:
filePath= '/dbfs/'
fileName = 'T201901PDPI BNFT'
extract7zMultiVolumn(filePath,fileName,True)

fileName = 'T201902PDPI BNFT'
extract7zMultiVolumn(filePath,fileName,True)

fileName = 'T201903PDPI BNFT'
extract7zMultiVolumn(filePath,fileName,True)


#convert xlsx files to csv 
for f in os.listdir('/dbfs'):
  if(f.endswith('.xlsx')):
    convertXlsxToCsv(filePath, f)
    
    

In [0]:
#move files from localfolder into dbfs tmp folder 
for f in os.listdir('/dbfs'):
  if(f.endswith('.csv') or f.endswith('.json')):    
    dbutils.fs.cp("file:/dbfs/"+f,"dbfs:/tmp/")
    os.remove('/dbfs/'+f)

In [0]:
#check path 
def checkPathExist(checkPath):
  try:
    dbutils.fs.ls(checkPath)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False

#save to table 
def saveIntoLandingTable(tableName, inputDf):
  print("saving table {}...".format(tableName))
  spark.sql("use nhsgp")
  spark.sql("DROP TABLE IF EXISTS {}".format(tableName))
  tempPath = "/user/hive/warehouse/{}".format(tableName)
  if(checkPathExist(tempPath)):
    dbutils.fs.rm(tempPath, True)
  inputDf.write.format("delta").save(tempPath)
  spark.sql("CREATE TABLE {} USING DELTA LOCATION '{}'".format(tableName, tempPath))

  
#append fact table   
def appenFactTable(tableName, newDf):
  print("saving table {}...".format(tableName))
  spark.sql("use nhsgp")
  tempPath = "/user/hive/warehouse/{}/".format(tableName)
  existingTable  = DeltaTable.forPath(spark, tempPath)

  existingTable.alias("old").merge(
      newDf.alias("new"),
      "1 = 2") \
    .whenNotMatchedInsert(values =
      {
        "SHA": "new.SHA",
        "PCT": "new.PCT",
        "PRACTICE": "new.PRACTICE",
        "BNF_CODE": "new.BNF_CODE",
        "BNF_NAME": "new.BNF_NAME",
        "ITEMS": "new.ITEMS",
        "NIC": "new.NIC",
        "ACT_COST": "new.ACT_COST",
        "QUANTITY": "new.QUANTITY",        
        "PERIOD": "new.PERIOD"
      }
    ) \
    .execute()

#reading a nested JSON file 
def getColumnMappingsFromJson(filepath): 
  jdf = spark.read.json(filepath,encoding='utf-8')
  jdf = jdf.select(
      psf.array(psf.expr('bnf_code.*')).alias('bnf_code'),
      psf.array(psf.expr('bnf_name.*')).alias('bnf_name'),
      psf.array(psf.expr('practice.*')).alias('practice')
  )
  jdf = (jdf.withColumn("Code_Name_Practice", psf.explode(psf.arrays_zip("bnf_code", "bnf_name","practice")))
    .select("Code_Name_Practice.bnf_code", "Code_Name_Practice.bnf_name", "Code_Name_Practice.practice")) 
  return jdf 

In [0]:
#list temp files and load into landing storage 
rawFiles = [ f for f in dbutils.fs.ls('/tmp') if f.name.endswith('.csv') or f.name.endswith('.json') or f.name.endswith('.xlsx')]
factDescriptionFileCounter = 0 
for f in rawFiles:
  print("Processing file {}".format(f.name))
  if(f.name.endswith('.json')):
    loadDf = getColumnMappingsFromJson(f.path)
    saveIntoLandingTable("landing_column_mappings",loadDf)    
  if('CHEM SUBS' in f.name):
    loadDf = spark.read.csv(f.path, inferSchema=True, header=True, encoding='UTF-8')
    loadDf= loadDf.withColumnRenamed("CHEM SUB","ChemSub")
    saveIntoLandingTable("landing_dim_chem",loadDf)
  if('ADDR BNF' in f.name):
    loadDf = spark.read.csv(f.path, inferSchema=True, header=True, encoding='UTF-8')
    saveIntoLandingTable("landing_dim_practices",loadDf)
  if('BNF Snomed Mapping data' in f.name):
    localschema = StructType() \
      .add("index",IntegerType(),True) \
      .add("BNF Code",StringType(),True) \
      .add("BNF Name",StringType(),True) \
      .add("SNOMED Code",StringType(),True)
    
    loadDf = spark.read.csv(f.path, schema=localschema, header=True, encoding='UTF-8')
    loadDf= loadDf.withColumnRenamed('BNF Code','BNF_Code')
    loadDf= loadDf.withColumnRenamed('BNF Name','BNF_Name')
    loadDf= loadDf.withColumnRenamed('SNOMED Code','SNOMED_Code') 
    
    saveIntoLandingTable("landing_dim_BnfSnomedMapping",loadDf)
#   if('BNF Snomed Mapping data' in f.name):
#     loadDf = spark.read.format("com.crealytics.spark.excel") \
#                         .option("inferSchema", "true") \
#                         .option("treatEmptyValuesAsNulls", "true") \
#                         .option("header", "true") \
#                         .option("sheetName", "November 20") \
#                         .load(f.path)     
#     loadDf= loadDf.withColumnRenamed('BNF Code','BNF_Code')
#     loadDf= loadDf.withColumnRenamed('BNF Name','BNF_Name')
#     loadDf= loadDf.withColumnRenamed('SNOMED Code','SNOMED_Code')       
#     saveIntoLandingTable("landing_dim_BnfSnomedMapping",loadDf)
    
  if('PDPI BNFT.csv' in f.name):
    print(f.name)
    loadDf = spark.read.csv(f.path, inferSchema=True, header=True, encoding='UTF-8')
    loadDf= loadDf.withColumnRenamed("BNF CODE","BNF_CODE")
    loadDf= loadDf.withColumnRenamed("BNF NAME","BNF_NAME")
    loadDf= loadDf.withColumnRenamed("ACT COST","ACT_COST")
    if(factDescriptionFileCounter == 0 ):
      saveIntoLandingTable("landing_fact_predescription",loadDf)      
      print("Loading fact file No. {}".format(factDescriptionFileCounter))
    else:
      appenFactTable("landing_fact_predescription",loadDf)
      print("Loading fact file No. {}".format(factDescriptionFileCounter))
    factDescriptionFileCounter=factDescriptionFileCounter+1
    
    # remove imported raw file 
#   dbutils.fs.rm(f.path)    
 

In [0]:

# # clear temp files and tables 

spark.sql('use nhsgp')
spark.sql("show tables").show(truncate=False )

spark.sql("select count(1) from landing_fact_predescription").show()
# spark.sql("select count(1) from landing_dim_practices").show()
# spark.sql("select count(1) from landing_dim_chem").show()
# spark.sql("select count(1) from landing_column_mappings").show()
# spark.sql("select count(1) from landing_dim_chem").show()
# spark.sql("select count(1) from landing_dim_bnfsnomedmapping").show()



# spark.sql("drop table landing_fact_predescription ")
# spark.sql("drop table landing_dim_practices ")
# spark.sql("drop table landing_dim_chem ")
# spark.sql("drop table landing_column_mappings")



# clean all loading files 
# os.listdir('/dbfs')
# dbutils.fs.ls('/tmp')

# for f in os.listdir('/dbfs'):
#   os.remove('/dbfs/'+f)
# !ls -l '/dbfs'

# dbutils.fs.rm(f.path) 
# rawFiles = [ f for f in dbutils.fs.ls('/tmp') if f.name.endswith('.csv') or f.name.endswith('.json') or f.name.endswith('.xlsx') ]
# for f in rawFiles:
#   dbutils.fs.rm(f.path)
  
      
dbutils.fs.ls('/tmp')            
