
This notebook shows you how to create and query a table or DataFrame loaded from data stored in Azure Blob storage.


### Step 1: Set the data location and type

There are two ways to access Azure Blob storage: account keys and shared access signatures (SAS).

To get started, we need to set the location and type of the file.

In [0]:
storage_account_name = "bigdatastorageszokolin"
storage_account_access_key = "DGWhNFOez/WrzHCA42V2iLPCl//bUezMec6QEICbPwYxtItCd90cr1AUtCuaNklB+1ed2Xg/mOKp+AStIwizNQ=="

blob_name = "bigdatablobszokolin"

In [0]:
# file_location = "wasbs://bigdatablobszokolin/Google.csv"
file_location = f"https://{storage_account_name}.blob.core.windows.net/{blob_name}/Google.csv"
file_type = "csv"
# file_type = "input"
# file_type = "delta"

In [0]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)


### Step 2: Read the data

Now that we have specified our file metadata, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

First, let's create a DataFrame in Python.

In [0]:
data1 = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df1 = spark.createDataFrame(data1, ["Name", "Age"])

In [0]:
df1.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [0]:
display(df1.select("Name"))

Name
Alice
Bob
Charlie


In [0]:
# no access?
dbutils.fs.ls(f"wasbs://{blob_name}@{storage_account_name}.blob.core.windows.net/")

[FileInfo(path='wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/Google.csv', name='Google.csv', size=1070774, modificationTime=1701771724000),
 FileInfo(path='wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/World-Stock-Prices-Dataset.csv', name='World-Stock-Prices-Dataset.csv', size=41042247, modificationTime=1702301003000),
 FileInfo(path='wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/recipes.json', name='recipes.json', size=17193424, modificationTime=1702300961000),
 FileInfo(path='wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/testfile_new/', name='testfile_new/', size=0, modificationTime=0)]

In [0]:
# df1.write.csv(f"wasbs://{blob_name}@{storage_account_name}.blob.core.windows.net/testfile_new")

# pandas_df = df1.toPandas()
# pandas_df.to_csv("/dbfs/FileStore/tables/pandas.csv", encoding="UTF-8")

# print(pandas_df.head())

# %fs ls dbfs:/FileStore/tables

In [0]:
# # no access?
# %fs ls wasbs://{blob_name}@{storage_account_name}.blob.core.windows.net/

In [0]:
# no access?

# df = spark.read.format(file_type).option("inferSchema", "true").load(file_location)
# df = spark.read.format(file_type).load(file_location)

df = spark.read.option("inferSchema", "true").option("header", "true").csv("wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/Google.csv")

df_stock = spark.read.option("inferSchema", "true").option("header", "true").csv("wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/World-Stock-Prices-Dataset.csv")
# df = spark.read.option("inferSchema", "true").format("delta").csv(file_location)
df.show(5)
df_stock.show(5)

+--------------------+--------------------+--------------------+--------------+------+
|                  id|                name|         description|  manufacturer| price|
+--------------------+--------------------+--------------------+--------------+------+
|http://www.google...|learning quickboo...|learning quickboo...|        intuit| 38.99|
|http://www.google...|superstart! fun w...|fun with reading ...|          NULL|  8.49|
|http://www.google...|qb pos 6.0 basic ...|qb pos 6.0 basic ...|        intuit|637.99|
|http://www.google...|math missions: th...|save spectacle ci...|          NULL| 12.95|
|http://www.google...|production prem c...|adobe cs3 product...|adobe software|805.99|
+--------------------+--------------------+--------------------+--------------+------+
only showing top 5 rows

+-------------------+------------------+-----------------+------------------+------------------+---------+---------+------------+----------+------+-------------+-------+
|               Date| 

In [0]:
# # no access + not supported/deprecated?
# containerName = blob_name
# storageAccountName = storage_account_name
# sas = "?sv=2022-11-02&ss=bfqt&srt=c&sp=rwdlacupyx&se=2023-12-05T22:58:55Z&st=2023-12-05T14:58:55Z&spr=https&sig=NNK%2BhcQ2EcSrGYiMDUVRShdsSd5lqK%2B%2FkKuEh80DLGM%3D"
# config = "fs.azure.sas." + containerName+ "." + storageAccountName + ".blob.core.windows.net"

# dbutils.fs.mount(
#   source = "wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/",
#   mount_point = "/mnt/storage",
#   extra_configs = {f"fs.azure.account.key."+storage_account_name+".blob.core.windows.net": storage_account_access_key})

# df = spark.read.option("inferSchema", "true").csv("/mnt/storage/bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.net/Google.csv")

In [0]:
# not needed? 
# dbutils.library.installPyPI("azure.storage.blob")

In [0]:
# connection = "DefaultEndpointsProtocol=https;AccountName=bigdatastorageszokolin;AccountKey=DGWhNFOez/WrzHCA42V2iLPCl//bUezMec6QEICbPwYxtItCd90cr1AUtCuaNklB+1ed2Xg/mOKp+AStIwizNQ==;EndpointSuffix=core.windows.net"

# from azure.storage.blob import BlobClient

In [0]:
# client = BlobClient.from_connection_string(connection, container_name=blob_name, blob_name="Google.csv")

In [0]:
# file that was already added to dbfs/FileStore... 
# with open("/dbfs/FileStore/tables/pandas.csv", "rb") as f:
#   client.upload_blob(f)

In [0]:
# # no access
# google_data = client.download_blob()


### Step 3: Query the data

Now that we have created our DataFrame, we can query it. For instance, you can identify particular columns to select and display.

In [0]:
# df_stock.union(df_stock).union(df_stock).count()
df_stock.count()
df_stock.filter(df_stock["Ticker"] == "AMZN").rdd.map(lambda x: x[0].upper()).collect()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2466348765832547>, line 3[0m
[1;32m      1[0m [38;5;66;03m# df_stock.union(df_stock).union(df_stock).count()[39;00m
[1;32m      2[0m df_stock[38;5;241m.[39mcount()
[0;32m----> 3[0m df_stock[38;5;241m.[39mfilter(df_stock[[38;5;124m"[39m[38;5;124mTicker[39m[38;5;124m"[39m] [38;5;241m==[39m [38;5;124m"[39m[38;5;124mAMZN[39m[38;5;124m"[39m)[38;5;241m.[39mrdd[38;5;241m.[39mmap([38;5;28;01mlambda[39;00m x: x[[38;5;241m0[39m][38;5;241m.[39mupper())[38;5;241m.[39mcollect()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     

In [0]:
# do something
from pyspark.sql.functions import regexp_extract
pattern = r'(\d+)'
# Add a new column 'number' by applying the regexp_extract on the 'id' column

df = df.union(df).union(df).union(df).union(df).union(df).union(df).union(df).union(df).union(df).union(df).union(df).union(df)

df = df.withColumn("number", regexp_extract("id", pattern, 1))

# Show the updated DataFrame
print(df.count())
df.show()

461318
+--------------------+--------------------+--------------------+--------------------+-------+--------------------+
|                  id|                name|         description|        manufacturer|  price|              number|
+--------------------+--------------------+--------------------+--------------------+-------+--------------------+
|http://www.google...|learning quickboo...|learning quickboo...|              intuit|  38.99|11125907881740407428|
|http://www.google...|superstart! fun w...|fun with reading ...|                NULL|   8.49|11538923464407758599|
|http://www.google...|qb pos 6.0 basic ...|qb pos 6.0 basic ...|              intuit| 637.99|11343515411965421256|
|http://www.google...|math missions: th...|save spectacle ci...|                NULL|  12.95|12049235575237146821|
|http://www.google...|production prem c...|adobe cs3 product...|      adobe software| 805.99|12244614697089679523|
|http://www.google...|video studio 11 plus|corel video studi...|   corel 

In [0]:
display(df.select("manufacturer", "number"))

manufacturer,number
intuit,11125907881740407428
,11538923464407758599
intuit,11343515411965421256
,12049235575237146821
adobe software,12244614697089679523
corel corporation,12311932123270766595
canopus/grass valley,13364214809208487698
intuit,12835181529749808477
intuit,12949835105262366018
sony creative software,12991998923940483779



### Step 4: (Optional) Create a view or table

If you want to query this data as a table, you can simply register it as a *view* or a table.

In [0]:
df.createOrReplaceTempView("YOUR_TEMP_VIEW_NAME")


We can query this view using Spark SQL. For instance, we can perform a simple aggregation. Notice how we can use `%sql` to query the view from SQL.

In [0]:
spark.sql("SELECT manufacturer, SUM(price) FROM YOUR_TEMP_VIEW_NAME GROUP BY manufacturer").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[manufacturer#80], functions=[finalmerge_sum(merge sum#11367) AS sum(cast(price#81 as double))#10490])
   +- Exchange hashpartitioning(manufacturer#80, 200), ENSURE_REQUIREMENTS, [plan_id=14182]
      +- HashAggregate(keys=[manufacturer#80], functions=[partial_sum(cast(price#81 as double)) AS sum#11367])
         +- Union
            :- FileScan csv [manufacturer#80,price#81] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.n..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<manufacturer:string,price:string>
            :- FileScan csv [manufacturer#117,price#118] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[wasbs://bigdatablobszokolin@bigdatastorageszokolin.blob.core.windows.n..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<manufacturer:str

In [0]:
%sql

SELECT manufacturer, SUM(price) FROM YOUR_TEMP_VIEW_NAME GROUP BY manufacturer

manufacturer,sum(price)
canopus/grass valley,83796.5700000001
punch software,309266.10000000085
symantec,2572.5699999999943
allume systems,6167.590000000013
sage software,96949.71000000012
microsoft,142744.0300000003
netopia,83908.10999999997
sony online entertainment inc,3301.870000000005
adobe systems inc,112774.0900000002
computer associates,119546.57000000028



Since this table is registered as a temp view, it will be available only to this notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.

In [0]:
df.write.format("parquet").saveAsTable("GOOGLE_DATA_PERMANENT_TABLE_bigger")


This table will persist across cluster restarts and allow various users across different notebooks to query this data.