In [None]:
%pip install azure-storage-blob


In [None]:
from pyspark.sql import SparkSession
from azure.storage.blob import BlobServiceClient, ContentSettings
import os
from io import BytesIO
import pandas

# Replace these variables with your Blob Storage account details
account_name = "wu3storage"
access_key = "your-access-key"
container_name = "datalake"
input_folder = "landing"
output_folder = "process"

# Configure the Spark session to access Blob Storage
spark = SparkSession.builder \
    .appName("AzureBlobStorage") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.2.0") \
    .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("spark.hadoop.fs.azure.account.key." + account_name + ".blob.core.windows.net", access_key) \
    .getOrCreate()


# Create a list with the names of the tables you want to read
tables = ["processedArtists", "processedListeningHistory", "processedSongs", "processedUserPreferences", "processedUsers"]


# Loop through each table and create a DataFrame
dfs = []
for table in tables:
    input_path = f"wasbs://{container_name}@{account_name}.blob.core.windows.net/{input_folder}/{table}.parquet"
    df = spark.read.parquet(input_path)
    dfs.append(df)
    
blob_service = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=access_key)

# # Loop through all dataframes and apply transformations
for i, df in enumerate(dfs):
    print(f"Processing DataFrame {i + 1}")

    # Remove rows with missing values
    df = df.dropna()

    # Convert Spark DataFrame to Pandas DataFrame
    pdf = df.toPandas()

    # Save the DataFrame to a temporary Parquet file in memory and upload it
    with BytesIO() as temp_file:
        pdf.to_parquet(temp_file, engine='pyarrow')
        temp_file.seek(0)

        # Upload the Parquet file to Azure Blob Storage
        output_path = f"{output_folder}/{tables[i]}.parquet"
        container_client = blob_service.get_container_client(container_name)
        blob_client = container_client.get_blob_client(output_path)

        blob_client.upload_blob(temp_file, overwrite=True)

    print(f"DataFrame {i + 1} processed successfully!")

print("All DataFrames processed successfully!")