# Reading Data

## Spark session creation

### Blob credentials at session creation

It is possible to pass Azure SAS token directly when creating the session

In [None]:
# SAS tokens
import sys
from pyspark.sql import SparkSession
from azure.storage.blob import ContainerClient
from pathlib import Path

sys.path.insert(0, "..")
from blob_credentials import facts_sas_token, facts_container, workspace_sas_token, workspace_container

In [None]:
myname = "Leo"

spark = SparkSession \
    .builder \
    .appName(f"Test-{myname}") \
    .config("spark.executor.instance", "1") \
    .config("spark.executor.memory","512m") \
    .config('spark.jars.packages',"org.apache.hadoop:hadoop-azure:3.1.1") \
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("fs.wasbs.impl","org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config(f"fs.azure.sas.{facts_container}.hecdf.blob.core.windows.net", facts_sas_token) \
    .config(f"fs.azure.sas.{workspace_container}.hecdf.blob.core.windows.net", workspace_sas_token) \
    .getOrCreate()


## Define your blob services to access files on Azure Blob Storage

In [None]:
from azure.storage.blob import ContainerClient

account_url = "https://hecdf.blob.core.windows.net"

facts_blob_service = ContainerClient(account_url=account_url,
                                     container_name=facts_container,
                                     credential=facts_sas_token)
workspace_blob_service = ContainerClient(account_url=account_url,
                                         container_name=workspace_container,
                                         credential=workspace_sas_token)

# List files from your workspace

In [None]:
# List the files in your workspace DO NOT FORGET your name prefix
def list_my_files():
    blobs = list(workspace_blob_service.list_blobs(myname))
    for blob in blobs:
        print(blob.name)


# List the files in your whole group workspace
def list_group_files():
    blobs = list(workspace_blob_service.list_blobs())
    for blob in blobs:
        print(blob.name)


# List the files in facts container
def list_facts_files():
    blobs = list(facts_blob_service.list_blobs())
    for blob in blobs:
        print(blob.name)

list_facts_files()

# Copy a file from/to your container

In [None]:
blobs = list(facts_blob_service.list_blobs())

for blob in blobs:
    print(blob.name.split("/")[-1])

In [None]:
# blobs = list(facts_blob_service.list_blobs())


def get_name(file_path):
    return file_path.name.split('/')[-1]


def download_blob(blob_path, destination_dir):
    """
    dowload file on azure
    """
    blob_name = get_name(blob_path)
    Path(f"{destination_dir}/{blob_name}").parent.mkdir(parents=True, exist_ok=True)
    with open(f"{destination_dir}/{blob_name}", "wb") as data:
        download_stream = facts_blob_service.get_blob_client(blob_path).download_blob()
        data.write(download_stream.readall())


def download_blobs(facts_blob_service, destination_dir):
    blobs = list(facts_blob_service.list_blobs())
    for blob in blobs:
        download_blob(blob, destination_dir)

In [None]:
destination_dir = "../data"
download_blobs(facts_blob_service, destination_dir)

In [None]:
spark.stop()