#### Data Ingestion: Convert to Parquet
Connect to sink instance and convert the file to parquet. Validate the result and send it to Azure Data Factory.

In [0]:
%run "/Shared/Metadata Driven Ingestion Framework/Tools/utilities"


In [0]:
%run "/Shared/Metadata Driven Ingestion Framework/Development/DataIngestion/DI_02_Get_SchemaTest"

In [0]:
# Obtain the parameters sent by Azure Data Factory
dbutils.widgets.text("SinkGlobalParameters", "", "")
sink_params = dbutils.widgets.get("SinkGlobalParameters") 

dbutils.widgets.text("FwkParameters", "", "")
fwk_params = dbutils.widgets.get("FwkParameters")

dbutils.widgets.text("SinkPath", "", "")
sink_path = str(dbutils.widgets.get("SinkPath")).replace('///','/').replace('//','/')

dbutils.widgets.text("File", "", "")
landing_path_file_name = str(dbutils.widgets.get("File")).replace('///','/').replace('//','/')




In [0]:
fwk_params_dict = json.loads(fwk_params)
sink_params_dict = json.loads(sink_params)
schema=json.loads(fwk_params_dict['Schema'])
kv_scope_name = sink_params_dict["kv_scope_name"]                                     # Name of the Azure Key Vault-backed scope
kv_workspace_id = sink_params_dict["kv_workspace_id"].strip()                         # Name of the secret for the log analytics workspace id
kv_workspace_pk = sink_params_dict["kv_workspace_pk"].strip()                         # Name of the secret for the log analytics primary key
ing_sink_storage_secret_name = sink_params_dict["ing_sink_storage_secret_name"]
adls_source_secret = fwk_params_dict['SecretName']
ing_sink_container_name = sink_params_dict["ing_sink_container_name"] 
ing_sink_storage_name = sink_params_dict["ing_sink_storage_name"]
ing_sink_container_name = sink_params_dict["ing_sink_container_name"]
file_format = (landing_path_file_name.split('.')[-1]).lower()
file_name = landing_path_file_name.split('/')[-1]


In [0]:
schema

In [0]:
def check_schema(schema, df):
  if len(schema)!=0:
    print('aplpying custom schema')
    return schema, set_format_schema(schema)
  else:
    print('applying infered schema')
    if isinstance(df,pd.DataFrame):
      jsonschema = DTypecheck.from_pdf(df).get_schema()
    else:
      jsonschema = DTypecheck(df).get_schema()
      jsonSchemaOutput = set_format_schema(jsonschema)
      
    return jsonschema, jsonSchemaOutput

In [0]:
parquet_status = True
path = "abfss://{}@{}.dfs.core.windows.net/{}".format(ing_sink_container_name, ing_sink_storage_name, landing_path_file_name)
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.shell import spark
  
  
try:   
  spark.conf.set("fs.azure.account.key.{}.dfs.core.windows.net".format(ing_sink_storage_name),"{}".format(dbutils.secrets.get(scope = "{}".format(kv_scope_name), key= "{}".format(ing_sink_storage_secret_name))))
  print(file_format)
  """Read data Output Files and create delta tables """    
  if file_format  == 'csv':      
    df = spark.read.format("csv").option("header","true").load(path)
    jsonschema,jsonSchemaOutput = check_schema(schema,df)
    #read the file with the schema
    df = spark.read.schema(jsonSchemaOutput).format("csv").option("header","true").load(path)
   
  elif file_format  == 'txt':
    df = spark.read.option("header", "true").option("delimiter","|").csv(path)  #as of spark1.6 you can use  csv to read txt }
    jsonschema, jsonSchemaOutput = check_schema(schema,df)
    #read the file with the infered schema
    df = spark.read.schema(jsonSchemaOutput).option("header", "true").option("delimiter","|").csv(path)  #as of spark1.6 you can use  csv to read txt }

  elif file_format == 'json':
    df = spark.read.option("multiline","true").json(path)
    jsonschema,jsonSchemaOutput = check_schema(schema,df)
    df = spark.read.schema(jsonSchemaOutput).option("multiline","true").json(path)

  elif file_format == 'parquet':
    df = spark.read.parquet(path)
    if len(schema)==0:
      schema = schema2sql(df.schema)
    
    jsonschema = schema
    jsonSchemaOutput = set_format_schema(jsonschema)  
    
  elif file_format == 'xml':
    #file_name = landing_path_file_name.split('/')[-1]
    print('file name: {}'.format(file_name))
    landing_path_file_name = landing_path_file_name.replace(file_name, '')
    print('landing path file name: {}'.format(landing_path_file_name))
    mnt_path = mount_to_mnt(landing_path_file_name, kv_scope_name, ing_sink_storage_secret_name,ing_sink_storage_name,ing_sink_container_name)
    print('mnt path: {}'.format(mnt_path))
    finalpath=mnt_path + file_name
    print(finalpath)
    df_pd = pd.read_xml(finalpath)
    df_pd=df_pd.replace('nan','NaN').fillna('NaN')
    df_pd2 = spark.createDataFrame(df_pd)
    #jsonschema,jsonSchemaOutput = check_schema(schema,df_pd)
    if len(schema)==0:
      try:
        df_pd2 = spark.createDataFrame(df_pd)
        schema = schema2sql(df_pd2.schema)
      except:
        l=(x for x in pd.io.json.build_table_schema(df_pd)['fields'] if x['name']!='index')
        schema=list(l)       
      
    jsonschema = schema
    jsonSchemaOutput = set_format_schema(jsonschema)
    jsonSchemaOutput=jsonSchemaOutput.replace('DECIMAL','FLOAT')
    jsonSchemaOutput=jsonSchemaOutput.replace('number','FLOAT')
    #read the file with the infered schema
    df=spark.createDataFrame(df_pd,jsonSchemaOutput)


    #unmount(mnt_path)
  elif file_format  == 'xls' or file_format=='xlsx':
    
    !pip install openpyxl
    !pip install xlrd
    
    print('file name: {}'.format(file_name))
    landing_path_file_name = landing_path_file_name.replace(file_name, '')
    print('landing path file name: {}'.format(landing_path_file_name))
    mnt_path = mount_to_mnt(landing_path_file_name, kv_scope_name, ing_sink_storage_secret_name,ing_sink_storage_name,ing_sink_container_name)
    
    print('mnt path: {}'.format(mnt_path))
    
    
    sc = SparkContext.getOrCreate()
    spark_session = SparkSession(sc)
    df = pd.read_excel(mnt_path + file_name, engine='openpyxl')
    df=df.replace('nan','NaN').fillna('NaN').replace('?','NaN')
    newdf = df.rename(columns=lambda x: x.replace(" ", '_'))
    newdf = newdf.rename(columns=lambda x: x.replace(":" , ''))
    if len(schema)==0:
      try:
        newdf2 = spark.createDataFrame(newdf)
        schema = schema2sql(newdf2.schema)
      except:
        l=(x for x in pd.io.json.build_table_schema(newdf)['fields'] if x['name']!='index')
        schema=list(l)
    
    jsonschema = schema
    jsonSchemaOutput = set_format_schema(jsonschema)
    jsonSchemaOutput=jsonSchemaOutput.replace('DECIMAL','FLOAT')
    jsonSchemaOutput=jsonSchemaOutput.replace('number','FLOAT')


    df = spark.createDataFrame(newdf,jsonSchemaOutput)

  save_to_adls = "abfss://{}@{}.dfs.core.windows.net/{}".format(ing_sink_container_name, ing_sink_storage_name, sink_path)
  
  df.write.format("parquet").mode("overwrite").save(save_to_adls)
  
except Exception as ex:
  print('ERROR: {}'.format(ex))
  raise Exception(f'Error: {ex}')
  msg_error = {'ExecutionStatus': 'Failed','Error Message':'ERROR in convert_tables method','FunctionName':'convert_tables'}
  post_data(kv_scope_name, kv_workspace_id, kv_workspace_pk, msg_error)

In [0]:
dbutils.notebook.exit(jsonschema)

[{'id': '1', 'name': 'id', 'type': 'Varchar (255)'}, {'id': '2', 'name': 'author', 'type': 'Varchar (255)'}, {'id': '3', 'name': 'title', 'type': 'Varchar (255)'}, {'id': '4', 'name': 'genre', 'type': 'Varchar (255)'}, {'id': '5', 'name': 'price', 'type': 'Decimal(18,4)'}, {'id': '6', 'name': 'publish_date', 'type': 'Varchar (255)'}, {'id': '7', 'name': 'description', 'type': 'Varchar (255)'}]