This notebook shows you how to create and query a table or DataFrame loaded from data stored in Azure Blob storage.

### Step 1: Set the data location and type

There are two ways to access Azure Blob storage: account keys and shared access signatures (SAS).

To get started, we need to set the location and type of the file.

In [0]:
storage_account_name = "dataingest002581"
container_name = "electricity-data"

In [0]:
scope_name = "electricity-scope"
print(dbutils.secrets.list(scope_name))

In [0]:
file_location = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net"
conf_key = f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net"
mount_point = f"/mnt/{container_name}"

if mount_point in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
  dbutils.fs.unmount(mount_point)

In [0]:
dbutils.fs.mount(
  source = file_location,
  mount_point = mount_point,
  extra_configs = {f"{conf_key}":dbutils.secrets.get(scope = scope_name, key = "storage-account-access-key")})

In [0]:
import os
import numpy as np
import databricks.koalas as ks

dbutils.fs.mkdirs(os.path.join(mount_point, 'bronze'))
dbutils.fs.ls(mount_point + '/bronze')
print(f"Raw Files:")
[print("Path:", f.path, "\tSize:", f.size) for f in dbutils.fs.ls(mount_point)]
print(f"\n")

### Step 2: Read the data

Now that we have specified our file metadata, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

First, let's create a DataFrame in Python.

In [0]:
file_list = dbutils.fs.ls(mount_point)
# file_location = file_list[2].path
file_location = "dbfs:/mnt/electricity-data/f923_2011.csv"
file_type = "csv"
write_location = file_list[0].path + file_location.split('/')[-1].split('.')[0]
print(f"Read Location: {file_location}"); print(f"Write Location: {write_location}")

In [0]:
df = ks.read_csv("dbfs:/mnt/electricity-data/f923_2011.csv", header=0) # file_location, header=0)
df.columns = [s.replace(" ", "_").replace("&", "and").replace(".", "_") for s in df.columns]
df.columns = [s.replace(r"(", r"").replace(r")", r"") for s in df.columns]
ddl = [df.spark.schema().simpleString().split("<")[1].split(">")[0]]
ddl = [s.replace(':', ' ') for s in ddl][0]

In [0]:
dbutils.fs.ls(mount_point + "/bronze")

In [0]:
for path in [f.path for f in dbutils.fs.ls(mount_point) if f.size > 0]:
  name = path.split('/')[-1].split('.')[0]
  write_location = f"dbfs:/mnt/electricity-data/bronze/{name}"
  df = ks.read_csv(path, header=None, names=ddl) # if df.columns[-1].lower() != 'year' else ks.read_csv(path, header=0)
  print(f"header length {name}: {df.columns.size}")
  assert df.columns[-1].lower() == 'year', f"{name} last field should be 'year'"
  try:
    df.to_delta(write_location)
  except Exception as err:
    print(f"{err} in {path.split(r'/')[-1]}")

In [0]:
dbutils.fs.mkdirs(os.path.join(mount_point, 'silver'))
dbutils.fs.mkdirs(os.path.join(mount_point, 'gold'))
for f in dbutils.fs.ls(mount_point):
  if f.size <=0:
    print(f.path)
# [print(f.path) for f in dbutils.fs.ls(os.path.join(mount_point, 'bronze', 'f923_2020'))]

In [0]:
silver_path = os.path.join(mount_point, 'silver')

In [0]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
# df = spark.read.format("delta").load(os.path.join(mount_point, 'bronze', 'f923_2008')).to_koalas()
df = ks.read_delta(os.path.join(mount_point, 'bronze', 'f923_2008'))
df = df.loc[df['Plant_Id'].notnull(), :]
df = df.drop(['Operator_Id'], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Reserved")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Netgen_")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Elec_MMBtu")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Tot_MMBtu")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("MMBtuPer_Unit_")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Elec_Quantity_")], axis=1)
df = df.drop(df.columns[df.columns.str.startswith("Quantity_")], axis=1)

In [0]:
ks.set_option('compute.ops_on_diff_frames', True)
df[['Plant_Id', 'Nuclear_Unit_Id', 'NAICS_Code', 'EIA_Sector_Number', 'YEAR']] = df[['Plant_Id', 'Nuclear_Unit_Id', 'NAICS_Code', 'EIA_Sector_Number', 'YEAR']].apply(lambda _: _.astype(np.float).astype("Int32"))
ks.set_option('compute.ops_on_diff_frames', False)

In [0]:
write_path = mount_point + '/silver/f923_ytd_2008'
# df.to_delta(write_path, mode='overwrite')
df.to_spark_io(write_path, format='delta', mode='overwrite')

In [0]:
df = df.loc[df['Plant_Id'].notnull(), ]
df['Plant_Id'] = df['Plant_Id'].map(int)

In [0]:
tbl = spark.read.format("delta").load(os.path.join(mount_point, 'bronze', 'f923_2009'))
display(tbl)

Plant_Id,Combined_Heat_and_Power_Plant,Nuclear_Unit_Id,Plant_Name,Operator_Name,Operator_Id,State,Census_Region,NERC_Region,Reserved,NAICS_Code,EIA_Sector_Number,Sector_Name,Reported_Prime_Mover,Reported_Fuel_Type_Code,AER_Fuel_Type_Code,Reserved_1,Reserved_2,Physical_Unit_Label,Quantity_Jan,Quantity_Feb,Quantity_Mar,Quantity_Apr,Quantity_May,Quantity_Jun,Quantity_Jul,Quantity_Aug,Quantity_Sep,Quantity_Oct,Quantity_Nov,Quantity_Dec,Elec_Quantity_Jan,Elec_Quantity_Feb,Elec_Quantity_Mar,Elec_Quantity_Apr,Elec_Quantity_May,Elec_Quantity_Jun,Elec_Quantity_Jul,Elec_Quantity_Aug,Elec_Quantity_Sep,Elec_Quantity_Oct,Elec_Quantity_Nov,Elec_Quantity_Dec,MMBtuPer_Unit_Jan,MMBtuPer_Unit_Feb,MMBtuPer_Unit_Mar,MMBtuPer_Unit_Apr,MMBtuPer_Unit_May,MMBtuPer_Unit_Jun,MMBtuPer_Unit_Jul,MMBtuPer_Unit_Aug,MMBtuPer_Unit_Sep,MMBtuPer_Unit_Oct,MMBtuPer_Unit_Nov,MMBtuPer_Unit_Dec,Tot_MMBtuJan,Tot_MMBtuFeb,Tot_MMBtuMar,Tot_MMBtuApr,Tot_MMBtuMay,Tot_MMBtuJun,Tot_MMBtuJul,Tot_MMBtuAug,Tot_MMBtuSep,Tot_MMBtuOct,Tot_MMBtuNov,Tot_MMBtuDec,Elec_MMBtuJan,Elec_MMBtuFeb,Elec_MMBtuMar,Elec_MMBtuApr,Elec_MMBtuMay,Elec_MMBtuJun,Elec_MMBtuJul,Elec_MMBtuAug,Elec_MMBtuSep,Elec_MMBtuOct,Elec_MMBtuNov,Elec_MMBtuDec,Netgen_Jan,Netgen_Feb,Netgen_Mar,Netgen_Apr,Netgen_May,Netgen_Jun,Netgen_Jul,Netgen_Aug,Netgen_Sep,Netgen_Oct,Netgen_Nov,Netgen_Dec,Total_Fuel_Consumption_Quantity,Electric_Fuel_Consumption_Quantity,Total_Fuel_Consumption_MMBtu,Elec_Fuel_Consumption_MMBtu,Net_Generation_Megawatthours,YEAR
,Combined Heat & Power Plant,,Plant Name,Operator Name,,State,Census Region,NERC Region,Reserved,,,Sector Name,Reported Prime Mover,Reported Fuel Type Code,AER Fuel Type Code,Reserved .1,Reserved .2,Physical Unit Label,QUANTITY_JAN,QUANTITY_FEB,QUANTITY_MAR,QUANTITY_APR,QUANTITY_MAY,QUANTITY_JUN,QUANTITY_JUL,QUANTITY_AUG,QUANTITY_SEP,QUANTITY_OCT,QUANTITY_NOV,QUANTITY_DEC,ELEC_QUANTITY_JAN,ELEC_QUANTITY_FEB,ELEC_QUANTITY_MAR,ELEC_QUANTITY_APR,ELEC_QUANTITY_MAY,ELEC_QUANTITY_JUN,ELEC_QUANTITY_JUL,ELEC_QUANTITY_AUG,ELEC_QUANTITY_SEP,ELEC_QUANTITY_OCT,ELEC_QUANTITY_NOV,ELEC_QUANTITY_DEC,MMBTU_PER_UNIT_JAN,MMBTU_PER_UNIT_FEB,MMBTU_PER_UNIT_MAR,MMBTU_PER_UNIT_APR,MMBTU_PER_UNIT_MAY,MMBTU_PER_UNIT_JUN,MMBTU_PER_UNIT_JUL,MMBTU_PER_UNIT_AUG,MMBTU_PER_UNIT_SEP,MMBTU_PER_UNIT_OCT,MMBTU_PER_UNIT_NOV,MMBTU_PER_UNIT_DEC,TOT_MMBTU_JAN,TOT_MMBTU_FEB,TOT_MMBTU_MAR,TOT_MMBTU_APR,TOT_MMBTU_MAY,TOT_MMBTU_JUN,TOT_MMBTU_JUL,TOT_MMBTU_AUG,TOT_MMBTU_SEP,TOT_MMBTU_OCT,TOT_MMBTU_NOV,TOT_MMBTU_DEC,ELEC_MMBTUS_JAN,ELEC_MMBTUS_FEB,ELEC_MMBTUS_MAR,ELEC_MMBTUS_APR,ELEC_MMBTUS_MAY,ELEC_MMBTUS_JUN,ELEC_MMBTUS_JUL,ELEC_MMBTUS_AUG,ELEC_MMBTUS_SEP,ELEC_MMBTUS_OCT,ELEC_MMBTUS_NOV,ELEC_MMBTUS_DEC,NETGEN_JAN,NETGEN_FEB,NETGEN_MAR,NETGEN_APR,NETGEN_MAY,NETGEN_JUN,NETGEN_JUL,NETGEN_AUG,NETGEN_SEP,NETGEN_OCT,NETGEN_NOV,NETGEN_DEC,,,,,,
2.0,N,,Bankhead Dam,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,HY,WAT,HYC,,,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,282335.0,119344.0,256672.0,220520.0,286025.0,116444.0,83031.0,110115.0,202894.0,333608.0,328891.0,418871.0,282335.0,119344.0,256672.0,220520.0,286025.0,116444.0,83031.0,110115.0,202894.0,333608.0,328891.0,418871.0,28927.787,12227.884,26298.379,22594.291,29305.852,11930.788,8507.325,11282.228,20788.348,34181.161,33697.834,42917.123,0.0,0.0,,,282659.0,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,CA,NG,NG,,,mcf,0.0,0.0,4002.0,40457.0,41355.0,66851.0,63820.0,36144.0,24808.0,12874.0,3118.0,11078.0,0.0,0.0,4002.0,40457.0,41355.0,66851.0,63820.0,36144.0,24808.0,12874.0,3118.0,11078.0,0.0,0.0,1.03,1.032,1.03,1.028,1.0310000000000001,1.03,1.025,1.025,1.035,1.0290000000000001,0.0,0.0,4122.0,41752.0,42596.0,68723.0,65798.0,37228.0,25428.0,13196.0,3227.0,11399.0,0.0,0.0,4122.0,41752.0,42596.0,68723.0,65798.0,37228.0,25428.0,13196.0,3227.0,11399.0,96886.0,113846.0,240866.0,225797.0,207237.0,216324.0,212319.0,193326.0,150609.0,122105.0,161193.0,215922.0,304507.0,304507.0,,,2156430.0,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,CT,NG,NG,,,mcf,1902799.0,2220450.0,4573032.0,4219766.0,3831539.0,4014519.0,3921115.0,3659204.0,2855972.0,2310527.0,3063830.0,4105081.0,1902799.0,2220450.0,4573032.0,4219766.0,3831539.0,4014519.0,3921115.0,3659204.0,2855972.0,2310527.0,3063830.0,4105081.0,1.036,1.032,1.03,1.032,1.03,1.028,1.0310000000000001,1.03,1.025,1.025,1.035,1.0290000000000001,1971300.0,2291504.0,4710223.0,4354799.0,3946485.0,4126926.0,4042670.0,3768980.0,2927371.0,2368290.0,3171064.0,4224128.0,1971300.0,2291504.0,4710223.0,4354799.0,3946485.0,4126926.0,4042670.0,3768980.0,2927371.0,2368290.0,3171064.0,4224128.0,186900.0,211673.0,434096.0,403362.0,366672.0,377432.0,373913.0,346778.0,271718.0,220878.0,298334.0,396736.0,40677834.0,40677834.0,,,3888492.0,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,ST,BIT,COL,,,short tons,383631.0,299011.0,278815.0,202224.0,346209.0,277557.0,265612.0,376269.0,374736.0,285904.0,233650.0,202916.0,383631.0,299011.0,278815.0,202224.0,346209.0,277557.0,265612.0,376269.0,374736.0,285904.0,233650.0,202916.0,22.528000000000002,22.758000000000003,22.462,22.214000000000002,22.646,22.305,22.919,22.674,22.406000000000002,22.306,22.324,22.055,8642439.0,6804892.0,6262743.0,4492204.0,7840249.0,6190909.0,6087561.0,8531523.0,8396335.0,6377375.0,5216003.0,4475312.0,8642439.0,6804892.0,6262743.0,4492204.0,7840249.0,6190909.0,6087561.0,8531523.0,8396335.0,6377375.0,5216003.0,4475312.0,875865.983,687562.108,630413.284,441720.491,789099.665,618490.55,603813.779,839447.628,847583.777,654591.744,517687.049,441427.926,3526534.0,3526534.0,,,7947703.984,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,ST,DFO,DFO,,,barrels,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,,0.0,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,ST,NG,NG,,,mcf,28469.0,22725.0,128729.0,117539.0,53000.0,28814.0,23872.0,56843.0,22395.0,50361.0,76097.0,311276.0,28469.0,22725.0,128729.0,117539.0,53000.0,28814.0,23872.0,56843.0,22395.0,50361.0,76097.0,311276.0,1.045,1.0330000000000001,1.03,1.032,1.03,1.026,1.0310000000000001,1.0310000000000001,1.025,1.024,1.035,1.0270000000000001,29750.0,23475.0,132591.0,121300.0,54590.0,29563.0,24612.0,58605.0,22955.0,51570.0,78760.0,319680.0,29750.0,23475.0,132591.0,121300.0,54590.0,29563.0,24612.0,58605.0,22955.0,51570.0,78760.0,319680.0,3015.0170000000003,2371.8920000000003,13346.716,11927.509,5494.335,2953.45,2441.221,5766.372,2317.223,5293.256,7816.951,31532.074,920120.0,920120.0,,,94276.016,2009.0
3.0,N,,Barry,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,ST,SUB,COL,,,short tons,,,,,,,,,,0.0,,,,,,,,,,,,0.0,,,,,,,,,,,,0.0,,,,,,,,,,,,0.0,,,,,,,,,,,,0.0,,,,,,,,,,,,0.0,,,0.0,0.0,,,0.0,2009.0
4.0,N,,Walter Bouldin Dam,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,HY,WAT,HYC,,,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1010219.0,357079.0,1099825.0,901785.0,930343.0,139724.0,127807.0,133488.0,683551.0,1228891.0,844669.0,970534.0,1010219.0,357079.0,1099825.0,901785.0,930343.0,139724.0,127807.0,133488.0,683551.0,1228891.0,844669.0,970534.0,103506.0,36586.0,112687.0,92396.0,95322.0,14316.0,13095.0,13677.0,70036.0,125911.0,86544.0,99440.0,0.0,0.0,,,863516.0,2009.0
7.0,N,,Gadsden,Alabama Power Co,195.0,AL,ESC,SERC,,22.0,1.0,Electric Utility,ST,AB,ORW,,,short tons,110.0,52.0,0.0,0.0,113.0,619.0,389.0,573.0,552.0,490.0,230.0,205.0,110.0,52.0,0.0,0.0,113.0,619.0,389.0,573.0,552.0,490.0,230.0,205.0,10.0,10.0,0.0,0.0,7.734,8.198,8.198,8.632,9.281,11.498000000000001,11.446,10.106,1100.0,520.0,0.0,0.0,874.0,5075.0,3189.0,4946.0,5123.0,5634.0,2633.0,2072.0,1100.0,520.0,0.0,0.0,874.0,5075.0,3189.0,4946.0,5123.0,5634.0,2633.0,2072.0,78.506,32.975,0.0,0.0,65.626,353.355,239.165,340.908,321.485,351.63100000000003,148.887,117.284,3333.0,3333.0,,,2049.822,2009.0


In [0]:
spark.sql("""CREATE DATABASE f923""")

In [0]:
spark.sql("""CREATE TABLE IF NOT EXISTS 'f923.generation_and_fuel_data' USING DELTA PARTITIONED BY (year) LOCATION 'dbfs:/mnt/electricity-data/silver/generation_and_fuel_data'""")

In [0]:
df.columns

### Step 3: Query the data

Now that we have created our DataFrame, we can query it. For instance, you can identify particular columns to select and display.

In [0]:
display(df.select("EXAMPLE_COLUMN"))

### Step 4: (Optional) Create a view or table

If you want to query this data as a table, you can simply register it as a *view* or a table.

In [0]:
df.createOrReplaceTempView("YOUR_TEMP_VIEW_NAME")

We can query this view using Spark SQL. For instance, we can perform a simple aggregation. Notice how we can use `%sql` to query the view from SQL.

In [0]:
%sql

SELECT EXAMPLE_GROUP, SUM(EXAMPLE_AGG) FROM YOUR_TEMP_VIEW_NAME GROUP BY EXAMPLE_GROUP

Since this table is registered as a temp view, it will be available only to this notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.

In [0]:
df.write.format("parquet").saveAsTable("MY_PERMANENT_TABLE_NAME")

This table will persist across cluster restarts and allow various users across different notebooks to query this data.