# Checking BI data in HDFS via Spark SQL #

## Background ##

### Source data files ###

* Business Index data ingestion process loads data from "external" source files into HDFS.
* External sources are:

> * Companies House (multiple CSV files)
> * HMRC - PAYE (CSV - currently extracted from ONS IDBR)
> * HMRC - VAT (CSV - currently extracted from ONS IDBR)

* We also load a file of "links" i.e. triples of Companies House | VAT | PAYE references that have been matched via the data science pre-processing stage.
* Each field in the triple may be empty, and there may be multiple VAT or PAYE references in one record.
* The "links" file (also described as "legal entities") is in JSON format.

### BI intermediate Parquet files ###

* The BI data ingestion process has to perform multiple processing steps e.g. generating or identifying UBRN, matching link references to corresponding CH/VAT/PAYE records, constructing final Business Index record etc.
* These steps are described in the README documentation for the BI project in Github.
* We store intermediate data for each step in HDFS files in Parquet format, which is a compressed columnar data format that allows greater efficiency when querying and processing the data e.g. in Apache Spark.
* This allows us to explore the intermediate data e.g. to look for bugs, check data contents etc.
* This Jupyter notebook shows how you can use Spark SQL to do this.

### Load BI source data files in Parquet ###

* The first thing we do with each of the external source data files described above (CH, PAYE, VAT) is to load it into Parquet, to make it easier and quicker to process later on.
* At this point we do not apply any extra transformations to the data, so the Parquet file should have the same record structure as the corresponding source data file.
* The links data is modified during the initial load process to add a UBRN (see below).

## Initialisation ##

* The Jupyter notebooks installation on the ONS  Cloudera cluster provides a default SparkContext object,  which is available as "sc".
* We also need a SQLContext object in order to use  Spark SQL  operations here.

In [None]:
sqlContext = SQLContext(sc)

## HDFS file locations ##

* Check the README docs in Github for more information on file locations.
* Our intermediate working data is stored under the "WORKINGDATA" directory.
* The following code assumes we are using the default file names as specified in the BI application configuration.

In [None]:
# HDFS working directory roots 
### CHECK THE BASE DIRECTORY MATCHES YOUR INSTALLATION ###
baseDir = "./ons.gov/businessIndex/dev"
wDir = "{0}/WORKINGDATA".format(baseDir)

# Source data files loaded into Parquet
chFile = "{0}/CH.parquet".format(wDir)
vatFile = "{0}/VAT.parquet".format(wDir)
payeFile = "{0}/PAYE.parquet".format(wDir)
linksFile = "{0}/LINKS_Output.parquet".format(wDir)

# Generated Business Index data file (records will be copied to ElasticSearch)
biFile = "{0}/BI_Output.parquet".format(wDir)

# Previous links data (used when we have to apply "month 2+" logic to UBRN)
prevDir =  "{0}/PREVIOUS".format(baseDir)
prevFile = "{0}/LINKS_Output.parquet".format(prevDir)

## Links data in Parquet ##

* The Links file is pre-processed to allocate each record a UBRN.
* The UBRN allocation rules are evolving, but this will involve comparing the latest file's data with the links loaded for previous months.
* The Parquet file contains the UBRN allocated via this process.

In [None]:
# Read the file into Spark
linksDf = sqlContext.read.parquet(linksFile)

# Count the records (forces data to be materialised)
print("LINKS contains {0} records.".format(str(linksDf.count())))

# Make the file available as a Spark SQL table
linksDf.registerTempTable("links")

# Display the record structure
linksDf.printSchema()

In [None]:
# Display the first few records
linksDf.show(10)

### Example of SQL query on links data ###

* Remember that PAYE and VAT references are actually arrays in the links record.
* You can specify that you want the first VAT reference in the array as "VAT[0]", for example

In [None]:
lndata = sqlContext.sql("SELECT * FROM links WHERE VAT[0] = 220062373000 LIMIT 5")
lndata.show(10)

## Companies House data in Parquet ##

In [None]:
chDf = sqlContext.read.parquet(chFile)

print("CH contains {0} records.".format(str(chDf.count())))

chDf.registerTempTable("ch")

chDf.printSchema()

### Example of SQL query on CH data ###

In [None]:
data = sqlContext.sql("SELECT COUNT(*) AS bad_recs FROM ch WHERE CompanyNumber = 'CompanyNumber'").limit(1)
data.show()

## VAT data in Parquet ##

In [None]:
vatDf = sqlContext.read.parquet(vatFile)

print("VAT contains {0} records.".format(str(vatDf.count())))

vatDf.registerTempTable("vat")

vatDf.printSchema()

### Example  of SQL query on VAT data ###

In [None]:
vatData = sqlContext.sql("SELECT * FROM vat WHERE vatref = 656091134000").limit(1)
vatData.show()

## PAYE data in Parquet ##

In [None]:
payeDf = sqlContext.read.parquet(payeFile)
print("PAYE contains {0} records.".format(str(payeDf.count())))
payeDf.registerTempTable("paye")
payeDf.printSchema()

### Example of SQL query on PAYE data ###

In [None]:
payeData = sqlContext.sql("SELECT payeref, name1 FROM paye WHERE LENGTH(payeref) = 4").limit(5)
paye_recs = payeData.collect()

for pr in paye_recs:
    print("Ref: {0}  Name: {1}".format(pr['payeref'],pr['name1']))

## Business Index data in Parquet ##

* The BI data file is constructed by the data ingestion process.
* We join the incoming links to the corresponding CH/PAYE/VAT data.
* Then we build a record with the required fields from each source.
* This BI record is written to a Parquet file.
* The final step in BI data ingestion will simply copy these records to ElasticSearch.

In [None]:
biDf = sqlContext.read.parquet(biFile)

print("BI contains {0} records.".format(str(biDf.count())))

biDf.registerTempTable("bi")

biDf.printSchema()

### Example of SQL query on BI data ###

In [None]:
data = sqlContext.sql("SELECT BusinessName, PostCode, IndustryCode, LegalStatus, EmploymentBands, PayeRefs FROM bi WHERE BusinessName = 'MOTA-TEST'").limit(1)
data.show()