In [50]:
import pyspark.sql.functions as f
import json

## Set Parameters

In [55]:
ownership_flag = 1
contracting_flag = 1
sanctions_flag = 1

raw_folderpath = ""
country_names = ""
country_name_alternatives = ""


In [None]:
#Create country list
if country_names:
    country_names = country_names.split(',')

if country_name_alternatives:
    country_name_alternatives = country_name_alternatives.split(',')

In [40]:
## Test Parameters
# ownership_flag = 1
# contracting_flag = 1
# sanctions_flag = 1

# Contracting
# raw_folderpath = 'BeneficialOwnership/OpenData/Contracting/*/AllTime/v1/full/*/*/*/*'

# Ownership
# raw_folderpath = 'BeneficialOwnership/OpenData/Ownership/AllData/v1/full/*/*/*/*'

# Sanctions
# raw_folderpath = 'BeneficialOwnership/OpenData/Sanctions/AllData/v1/full/*/*/*/*'



## Set Storage Config

In [6]:
#Storage Config
storageLinkedService = 'LS_DataLake'
storageAccount_ls = mssparkutils.credentials.getPropertiesAll(storageLinkedService)
storageAccountName = json.loads(storageAccount_ls)['Endpoint'].split('.')[0].replace('https://','')

spark.conf.set(f"spark.storage.synapse.{storageAccountName}.linkedServiceName","LS_DataLake")
spark.conf.set(f"fs.azure.account.oath.provider.type.{storageAccountName}","com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
spark.conf.set("spark.sql.adaptive.enabled",True)

## Define Flatten & Explode Function

In [7]:
def flatten_and_explode(df,limit_cols=None):

    """
    Flattens any struct or array columns in a PySpark DataFrame recursively.
    Inputs: df = dataframe to be flattened, limit_cols = specific columns to be processed (optional, speeds up the process)
    """
    array_cols = [c[0] for c in df.dtypes if c[1][:5] == 'array']
    struct_cols = [c[0] for c in df.dtypes if c[1][:6] == 'struct']
    other_cols = [c[0] for c in df.dtypes if ((c[1][:5] != 'array') & (c[1][:6] != 'struct'))]

    if limit_cols == None:
        limit_cols_updt = [c[0] for c in df.dtypes]
    else:
        limit_cols_updt = limit_cols

    while 1==1:

        if len(array_cols) == 0 and len(struct_cols) == 0:
            break

        # Flatten struct columns
        for c in struct_cols:
            df = df.select(other_cols+array_cols + struct_cols + [f.col(c + '.' + x).alias(c + '_' + x) for x in df.select(c + '.*').columns])
            df = df.drop(c)

            struct_cols = [c[0] for c in df.dtypes if ((c[1][:6] == 'struct') & (c[0] in limit_cols_updt))]
            array_cols = [c[0] for c in df.dtypes if ((c[1][:5] == 'array') & (c[0] in limit_cols_updt))]
            other_cols = [c[0] for c in df.dtypes if ((c[1][:5] != 'array') & (c[1][:6] != 'struct') & (c[0] in limit_cols_updt))]

            print('processed struct col '+str(c))

        # Explode array columns
        for c in array_cols:
            df_tonotexplode = df.where((f.col(c).isNull()) | (f.size(f.col(c)) ==0)).drop(c)
            df_toexplode = df.where((f.col(c).isNotNull()) & (f.size(f.col(c)) > 0)).withColumn(c, f.explode(f.col(c)))
            df = df_toexplode.unionByName(df_tonotexplode, allowMissingColumns=True)

            print('processed array col '+str(c))

        # Update columns to be processed
        if limit_cols == None:
            limit_cols_updt = [c[0] for c in df.dtypes]
        else:
            limit_cols_updt = limit_cols

        # Update column lists for next loop
        array_cols = [c[0] for c in df.dtypes if ((c[1][:5] == 'array') & (c[0] in limit_cols_updt))]
        struct_cols = [c[0] for c in df.dtypes if ((c[1][:6] == 'struct') & (c[0] in limit_cols_updt))]
        other_cols = [c[0] for c in df.dtypes if ((c[1][:5] != 'array') & (c[1][:6] != 'struct') & (c[0] in limit_cols_updt))]

    return df

In [8]:
def df_select_columns(source_df, selected_columns):
    existing_columns = source_df.columns
    columns_to_select = []
    
    for column in selected_columns:
        if column in existing_columns:
            columns_to_select.append(column)
        else:
            columns_to_select.append(f.lit("").alias(column))
    
    df = source_df.select(columns_to_select)
    renamed_columns = [f.col(column_name).alias(column_name.replace("_", "")) for column_name in df.columns]
    selected_df = df.select(*renamed_columns)

    return selected_df

## Read, Flatten, Write Data

In [78]:
#OpenContracting
if contracting_flag==1:
    #Read
    if country_names:
        try:
            df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*').withColumn('filename',f.input_file_name()).where(f.col('filename').rlike("|".join(country_names)))
        except:
            df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*').withColumn('filename',f.input_file_name()).where(f.col('filename').rlike("|".join(country_names)))
    else:
        try:
            df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*').withColumn('filename',f.input_file_name())
        except:
            df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*').withColumn('filename',f.input_file_name())
            
    #Select, Flatten, and Explode
    df = df_select_columns(df,['date','parties','tender','awards','filename'])
    limit_cols = ['date','parties','parties_address','parties_address_streetAddress','parties_address_postalCode','parties_address_countryName','parties_id','parties_name','parties_contactPoint','parties_contactPoint_email','parties_contactPoint_telephone','parties_contactPoint_url','parties_roles','tender','tender_id','tender_title','awards','awards_items','awards_items_id','awards_items_description','tender_lots','tender_lots_id','tender_lots_description','filename']
    flat_df = flatten_and_explode(df,limit_cols)

    #Write NonNull Records
    select_cols = ['date','parties_address_streetAddress','parties_address_postalCode','parties_address_countryName','parties_id','parties_name','parties_contactPoint_email','parties_contactPoint_telephone','parties_contactPoint_url','parties_roles','tender_id','tender_title','awards_items_id','awards_items_description','tender_lots_id','tender_lots_description','filename']
    df_select = df_select_columns(flat_df,select_cols)

    df_select_notnull = df_select.where(f.col('partiesid').isNotNull() & f.col('date').isNotNull() & f.col('partiesroles').isNotNull() & f.col('tenderid').isNotNull())
    df_select_notnull.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Contracting/Flattened')

    #Write Null Records
    df_select_null = df_select.where(f.col('partiesid').isNull() | f.col('date').isNull() | f.col('partiesroles').isNull() | f.col('tenderid').isNull())
    df_select_null.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Contracting/ContractingDF_FailedPreProcessing_Staging')

In [None]:
#OpenOwnership
if ownership_flag==1:
    #Read
    try:
        df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*')
    except:
        df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*')
    
    #Select, Flatten, and Explode

    #Entity
    df_entity = df_select_columns(df.where(f.col('statementType')=='entityStatement'),['statementID','statementType','name','addresses','identifiers'])
    limit_cols = ['statementID','statementType','name','identifiers','addresses','addresses_address','addresses_country','addresses_type','identifiers_id']
    entity_flat = flatten_and_explode(df_entity,limit_cols)

    #Limit country
    if country_names:
        country_name_alternatives.append(",".join(country_names))
        entity_flat = entity_flat.where(f.col('addresses_country').isin(country_name_alternatives))

    #Write NonNull Records
    select_cols = ['statementID','statementType','name','addresses_address','addresses_country','addresses_type','identifiers_id']
    df_select_entity = df_select_columns(entity_flat,select_cols)
    df_select_entity_notnull = df_select_entity.where(f.col('identifiersid').isNotNull() & f.col('addressesaddress').isNotNull())
    df_select_entity_notnull.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Ownership/Entity/Flattened')

    #Write Null Records
    df_select_entity_null = df_select_entity.where(f.col('identifiersid').isNull() | f.col('addressesaddress').isNull())
    df_select_entity_null.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Ownership/Entity/OwnershipDF_FailedPreProcessing')

    #Person
    df_person = df_select_columns(df.where(f.col('statementType')==f.lit('personStatement')),['statementID','statementType','identifiers','personType','addresses','names','nationalities'])
    limit_cols = ['statementID','statementType','names','identifiers','identifiers_id','addresses','addresses_address','nationalities','personType','names_fullName','nationalities_name','nationalities_code']
    person_flat = flatten_and_explode(df_person,limit_cols)

    #Limit country
    if country_names:
        country_name_alternatives.append(",".join(country_names))
        person_flat = person_flat.where(f.col('addresses_address').rlike("|".join(country_name_alternatives)))
    
    #Write NonNull Records
    select_cols = ['statementID','statementType','identifiers_id','personType','addresses_address','names_fullName','nationalities_name','nationalities_code']
    df_select_person = df_select_columns(person_flat,select_cols)
    df_select_person_notnull = df_select_person.where(f.col('identifiersid').isNotNull() & f.col('namesfullName').isNotNull())
    df_select_person_notnull.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Ownership/Person/Flattened')
    
    #Write Null Records
    df_select_person_null = df_select_person.where(f.col('identifiersid').isNull() | f.col('namesfullName').isNull())
    df_select_person_null.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Ownership/Person/OwnershipDF_FailedPreProcessing')


In [None]:
#OpenSanctions
if sanctions_flag==1:
    #Read
    df = spark.read.json('abfss://raw@storageAccountName.dfs.core.windows.net/'+raw_folderpath+'*')

    # Flatten according to schema (faster than all at once)

    # Company and Person
    df_company_person = df_select_columns(df.where((f.col('schema') == 'Company') | (f.col('schema') == 'Person')),['caption','id','referents','schema','properties'])
    limit_cols = ['caption','id','referents','schema','properties','properties_name','properties_addressEntity']
    company_person_flat = flatten_and_explode(df_company_person,limit_cols).select('caption','id','referents','schema','properties_name','properties_addressEntity')

    # Address
    df_address = df_select_columns(df.where(f.col('schema')=='Address'),['caption','id','referents','schema','properties'])
    limit_cols = ['caption','id','referents','schema','properties','properties_full']
    address_flat = flatten_and_explode(df_address,limit_cols).select('caption','id','referents','schema','properties_full')

    if country_names:
        country_name_alternatives.append(",".join(country_names))
        address_flat = address_flat.where(f.col('properties_full').rlike("|".join(country_name_alternatives)))

    # Sanction
    df_sanction = df_select_columns(df.where(f.col('schema')=='Sanction'),['caption','id','referents','schema','properties'])
    limit_cols = ['caption','id','referents','schema','properties','properties_entity','properties_startDate','properties_endDate','properties_reason','properties_authority']
    sanction_flat = flatten_and_explode(df_sanction,limit_cols).select('caption','id','referents','schema','properties_entity','properties_startDate','properties_endDate','properties_reason','properties_authority')


    # Join Sanction to Person/Company to get addressEntity
    # join on id
    sanction_id = sanction_flat.join(company_person_flat,sanction_flat.properties_entity==company_person_flat.id, how='inner').select(sanction_flat.caption,sanction_flat.id,'properties_name','properties_entity','properties_startDate','properties_endDate','properties_reason','properties_authority','properties_addressEntity')

    # join on referents
    sanction_referents = sanction_flat.join(company_person_flat,sanction_flat.properties_entity==company_person_flat.referents, how='inner').select(sanction_flat.caption,sanction_flat.id,'properties_name','properties_entity','properties_startDate','properties_endDate','properties_reason','properties_authority','properties_addressEntity')

    # append
    sanction_append = sanction_id.unionByName(sanction_referents, allowMissingColumns=True).distinct()

    # Join to Address to get full Address
    # join on addressEntity
    sanction_join = sanction_append.join(address_flat,sanction_append.properties_addressEntity==address_flat.id,how='left').select(sanction_append.id,sanction_append.caption,'properties_name','properties_entity','properties_startDate','properties_endDate','properties_reason','properties_authority','properties_full').distinct()


    #Write NonNull Records
    renamed_columns = [f.col(column_name).alias(column_name.replace("_", "")) for column_name in sanction_join.columns]
    df_select_sanctions_notnull = sanction_join.where(f.col('id').isNotNull() & f.col('caption').isNotNull()).select(*renamed_columns)
    df_select_sanctions_notnull.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Sanctions/Flattened')
    
    #Write Null Records
    df_select_sanctions_null = sanction_join.where(f.col('id').isNull() | f.col('caption').isNull()).select(*renamed_columns)
    df_select_sanctions_null.write.mode('overwrite').parquet('abfss://staging@storageAccountName.dfs.core.windows.net/BeneficialOwnership/OpenData/Sanctions/SanctionsDF_FailedPreProcessing')
