In [5]:
#
#  -- Learn Fabric Spark --
#
#  lesson 6 - dynamic full load
#    - rebuild tables
#    - show sample rows
#


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 7, Finished, Available)

In [6]:

var_path = 'raw/stocks/S&P-2013'
var_table = 'stocks_2013'
var_delimiter = ','
var_header = 'true'
var_schema = "symbol string, date string, open decimal(19,4), high decimal(19,4), low decimal(19,4), close decimal(19,4), adjclose decimal(19,4), volume int"


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 8, Finished, Available)

In [7]:
#
#  F1 - remove existing tables
#

# drop bronze table
stmt = f'drop table if exists bronze_{var_table}_full;'
ret = spark.sql(stmt)

# drop silver table
stmt = f'drop table if exists silver_{var_table}_full;'
ret = spark.sql(stmt)


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 9, Finished, Available)

In [8]:
#
#  F2 - load data frame
#

   
# root path
path = 'Files/' + var_path

# tmp table
table = 'tmp_' + var_table.strip() + '_full'

# schema flag
schema_flag = True
try:
  
  # none test
  if var_schema is None:
    schema_flag = False

  # empty string 
  if not bool(var_schema.strip()):
    schema_flag = False

except:
  pass


# load all files w/o schema
if not schema_flag:
    df = spark.read.format("csv") \
        .option("header",var_header) \
        .option("delimiter", var_delimiter) \
        .option("recursiveFileLookup", "true") \
        .load(path)

# load all files w/ schema
else:
    df = spark.read.format("csv") \
        .schema(var_schema) \
        .option("header",var_header) \
        .option("delimiter", var_delimiter) \
        .option("recursiveFileLookup", "true") \
        .load(path)

# convert to view
df.createOrReplaceTempView(table)

# debugging
print(f"data lake path - {path}")
print(f"temporary view - {table}")


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 10, Finished, Available)

data lake path - Files/raw/stocks/S&P-2013
temporary view - tmp_snp500_full


In [9]:
#
#  F3 - create bronze table (all files)
#
  
# spark sql - assume 1 level nesting on dir
stmt = f"""
  create table bronze_{var_table}_full as
  select 
    *, 
    current_timestamp() as _load_date,
    reverse(split(input_file_name(), '/'))[1] as _folder_name,
    split_part(reverse(split(input_file_name(), '/'))[0], '?', 1) as _file_name
  from 
    tmp_{var_table}_full 
"""

# create table
ret = spark.sql(stmt)

# debugging
print(f"execute spark sql - \n {stmt}")


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 11, Finished, Available)

execute spark sql - 
 
  create table bronze_snp500_full as
  select 
    *, 
    current_timestamp() as _load_date,
    reverse(split(input_file_name(), '/'))[1] as _folder_name,
    split_part(reverse(split(input_file_name(), '/'))[0], '?', 1) as _file_name
  from 
    tmp_snp500_full 



In [10]:
#
#  F4 - create silver table (lastest file)
#
  
# spark sql
stmt = f"""
  create table silver_{var_table}_full as
  with cte_{var_table} as
  (
    select * 
    from bronze_{var_table}_full as l
  )
  select 
    *
  from 
    cte_{var_table}
"""

# create table
ret = spark.sql(stmt)

# debugging
print(f"execute spark sql - \n {stmt}")


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 12, Finished, Available)

execute spark sql - 
 
  create table silver_snp500_full as
  with cte_snp500 as
  (
    select * 
    from bronze_snp500_full as l
  )
  select 
    *
  from 
    cte_snp500



In [11]:
#
#  Grab bronze count
#

try:
    sql_stmt = f"select count(*) from bronze_{var_table}_full"
    bronze_recs = spark.sql(sql_stmt).first()[0]
except:
    bronze_recs = 0

# show values
print(f"The bronze_{var_table}_full record count is {bronze_recs}")  


StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 13, Finished, Available)

The bronze_snp500_full record count is 125225


In [12]:
#
#  Grab silver count
#

try:
    sql_stmt = f"select count(*) from silver_{var_table}_full"
    silver_recs = spark.sql(sql_stmt).first()[0]
except:
    silver_recs = 0

# show values
print(f"The silver_{var_table}_full record count is {silver_recs}")  

StatementMeta(, cc20a134-b501-4a31-91a5-ef39411d81e0, 14, Finished, Available)

The silver_snp500_full record count is 125225
