In [None]:
from pyspark.sql import functions as fns
from pyspark import  SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StructType
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [None]:
import json
with open("table_names.json", "r") as f:
    table_names = json.load(f)

In [None]:

for k,v in table_names.items():
    # Some tables e.g. id 30 have very few records (possibly none?)
    print(v["filename"])
    
    # Load in Spark schema so we read with the correct datatypes
    with open("spark_schemas/{}.json".format(v["tablename"]), "r") as f:
        jsonschema = json.load(f)
    schema = StructType.fromJson(jsonschema)
    try:
        df =  spark.read.csv("s3a://alpha-everyone/deleteathenaout/outtemp/first_col={}".format(v["partition"]), schema=schema)
    except AnalysisException:
        continue

    
    headings = v["headings"]
    headings = headings + ["filename"]
    df = df.toDF(*headings)
    df.write.parquet("s3a://alpha-everyone/deleteathenaout/abpparquet/{}".format(v["tablename"]), mode="overwrite")
    

In [None]:
# Create a temp table for each table
for k,v in table_names.items():
    try:
        df = spark.read.parquet("s3a://alpha-everyone/deleteathenaout/abpparquet/{}".format(v["tablename"]))
    except AnalysisException:
        continue
    
    df.createOrReplaceTempView(v["tablename"])


In [None]:
sql = """


SELECT  
b.uprn as uprn,
b.postcode_locator as postcode,
CLASSIFICATION_CODE,
b.latitude,
b.longitude,

/*
Concatenate a single GEOGRAPHIC address line label
This code takes into account all possible combinations os pao/sao numbers and suffixes 
*/


case
when o.organisation is not null then o.organisation||' ' else '' end
-- Secondary Addressable Information 
||case when l.sao_text is not null then l.sao_text||' ' else '' end
-- case statement for different combinations of the sao start numbers (e.g. if no sao start suffix)
||case
when l.sao_start_number is not null and l.sao_start_suffix is null and l.sao_end_number is null
then l.sao_start_number||' '
when l.sao_start_number is null then '' else l.sao_start_number||'' end
-- case statement for different combinations of the sao start suffixes (e.g. if no sao end number)
||case
when l.sao_start_suffix is not null and l.sao_end_number is null then l.sao_start_suffix||' '
when l.sao_start_suffix is not null and l.sao_end_number is not null then l.sao_start_suffix else '' end
-- Add a '-' between the start and end of the secondary address (e.g. only when sao start and sao end)
||case
when l.sao_end_suffix is not null and l.sao_end_number is not null then '-'
when l.sao_start_number is not null and l.sao_end_number is not null then '-'else '' end
-- case statement for different combinations of the sao end numbers and sao end suffixes
||case
when l.sao_end_number is not null and l.sao_end_suffix is null then l.sao_end_number||' '
when l.sao_end_number is null then '' else l.sao_end_number end
-- pao end suffix
||case when l.sao_end_suffix is not null then l.sao_end_suffix||' ' else '' end
-- Primary Addressable Information 
||case when l.pao_text is not null then l.pao_text||' ' else '' end
-- case statement for different combinations of the pao start numbers (e.g. if no pao start suffix)
||case
when l.pao_start_number is not null and l.pao_start_suffix is null and l.pao_end_number is null
then l.pao_start_number||' '
when l.pao_start_number is null then ''
else l.pao_start_number||'' end
-- case statement for different combinations of the pao start suffixes (e.g. if no pao end number)
||case
when l.pao_start_suffix is not null and l.pao_end_number is null then l.pao_start_suffix||' '
when l.pao_start_suffix is not null and l.pao_end_number is not null then l.pao_start_suffix
else '' end
-- Add a '-' between the start and end of the primary address (e.g. only when pao start and pao end)
||case
when l.pao_end_suffix is not null and l.pao_end_number is not null then '-'
when l.pao_start_number is not null and l.pao_end_number is not null then '-'
else '' end
-- case statement for different combinations of the pao end numbers and pao end suffixes
||case
when l.pao_end_number is not null and l.pao_end_suffix is null then l.pao_end_number||' '
when l.pao_end_number is null then ''
else l.pao_end_number end
-- pao end suffix
||case when l.pao_end_suffix is not null then l.pao_end_suffix||' ' else '' end
-- Street Information 
||case when s.street_description is not null then s.street_description||' ' else '' end
-- Locality----- 
||case when s.locality_name is not null then s.locality_name||' ' else '' end

-- Town-------- 
||case when s.town_name is not null then s.town_name||' ' else '' end
-- Postcode--- 
||case when b.postcode_locator is not null then b.postcode_locator else '' end
AS full_address
 
 
 
 
FROM 
abp_street_descriptor AS s, 
abp_classification as c,
abp_lpi as l 
full outer join abp_organisation AS o on (l.uprn = o.uprn),
abp_blpu AS b


WHERE b.uprn = l.uprn and b.filename=l.filename
AND l.usrn = s.usrn and l.filename=s.filename
AND b.uprn = c.uprn and b.filename=c.filename
and ADDRESSBASE_POSTAL != 'N' 
and b.uprn=200003654869
"""

In [None]:
geographic_addresses =  spark.sql(sql)
geographic_addresses.show()

In [None]:
sql = """


SELECT
d.uprn as uprn,
postcode,
CLASSIFICATION_CODE,
b.latitude,
b.longitude,
(
 CASE WHEN department_name IS NOT NULL THEN department_name || ' ' ELSE '' END
 || CASE WHEN organisation_name IS NOT NULL THEN organisation_name || ' ' ELSE '' END
 || CASE WHEN sub_building_name IS NOT NULL THEN sub_building_name || ' ' ELSE '' END
 || CASE WHEN building_name IS NOT NULL THEN building_name || ' ' ELSE '' END
 || CASE WHEN building_number IS NOT NULL THEN building_number || ' ' ELSE '' END
 || CASE WHEN po_box_number IS NOT NULL THEN 'PO BOX ' || po_box_number || ' ' ELSE '' END
 || CASE WHEN dependent_thoroughfare IS NOT NULL THEN dependent_thoroughfare || ' ' ELSE '' END
 || CASE WHEN thoroughfare IS NOT NULL THEN thoroughfare || ' ' ELSE '' END
 || CASE WHEN double_dependent_locality IS NOT NULL THEN double_dependent_locality || ' ' ELSE '' END
 || CASE WHEN dependent_locality IS NOT NULL THEN dependent_locality  || ' ' ELSE '' END
 || CASE WHEN post_town IS NOT NULL THEN post_town || ' ' ELSE '' END
 || postcode
) AS full_address
FROM abp_delivery_point as d
left join abp_blpu as b
on d.uprn=b.uprn
left join abp_classification as c
on b.uprn = c.uprn


"""

In [None]:
delivery_addresses =  spark.sql(sql)

In [None]:
delivery_addresses.toPandas().head()

In [None]:
all_addresses = geographic_addresses.union(delivery_addresses)

In [None]:
all_addresses.toPandas()