In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
from pyspark.sql.functions import from_json, col, regexp_extract, when, regexp_replace, concat, lit
from datetime import datetime
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobPrefix
from azure.storage.blob import BlobServiceClient

# variables
container_name = 'aijobs'
storage_name = 'storageacctszymon'
source_folder = "source_files" # folder where source files are
sas_token = "0000000" #TODO generate SAS token in your Azure Storage

# spark conf
spark = SparkSession.builder.appName("test").getOrCreate()
spark.conf.set("fs.azure.account.key.storageacctszymon.blob.core.windows.net", sas_token)

In [None]:
source_dir = f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/source_files/"

file_list = dbutils.fs.ls(source_dir)

csv_files = []

for file in file_list:
    if 'processed' not in str(file): #or 'class' not in file or 'school' not in file:
        csv_files.append(file)

if len(csv_files) != 1:
    raise SystemExit("Multiple files found. Aborting job.")

In [None]:
schema = StructType([ 
StructField('id', 
            IntegerType(), True), 
StructField('title', 
            StringType(), True), 
StructField('senior', 
            BooleanType(), True), 
StructField('company', 
            StringType(), False), 
StructField('location', 
            StringType(), False) , 
StructField('salary_range', 
            StringType(), False), 
StructField('currency', 
            StringType(), True)
]) 

df = spark.read.format("csv").option("header",True).schema(schema).load(f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/source_files/*.csv")

raw_df_count = df.count()

print(f"No. of rows before processing: {raw_df_count}")

No. of rows before processing: 1534


In [None]:
df = df.dropna()

df = df.withColumn("salary_range", regexp_replace(col("salary_range"), "\u00A0", " "))

# Define the regular expression to match the salary components
pattern_min = r"(\d{1,3}\s\d{3})"
pattern_max = r"–\s*(\d{1,3}\s\d{3})"


# Extract the first and second parts of the salary and concatenate them
df = df.withColumns(
    {
        "salary_min": 
        when(
            col("salary_range").rlike(pattern_min),  # Check if the salary pattern exists
            concat(regexp_extract(col("salary_range"), pattern_min, 1))
        ).otherwise("N/A"),
        "salary_max": 
        when(
            col("salary_range").rlike(pattern_max),  # Check if the salary pattern exists
            concat(regexp_extract(col("salary_range"), pattern_max, 1))
        ).otherwise("N/A")
    }
)

# df.show(truncate=False)

df = df.withColumns(
    {
        "salary_max": regexp_replace('salary_max', ' ', ''),
        "salary_min": regexp_replace('salary_min', ' ', '')
    })

# df.show()

pattern_plus = r"\+"

df = df.withColumn('location',
        when(
            col("location").rlike(pattern_plus), 
            'Multiple'
        ).otherwise(col('location')))

# df.show(truncate=False)

df = df.drop('salary_range') \

df = df.withColumn('date', lit(datetime.today().strftime('%Y-%m-%d')))

df = df.select("id", "date", "title","senior","company", "location", "salary_min", "salary_max", "currency")

# df.show(truncate=False)

In [None]:
datetime_now = datetime.now().strftime('%Y-%m-%dT-%H-%M-%S')
path = f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/logs/{datetime_now}"  # protocols: wasbs, abfs (blob.core), abfss (dfs.core)

(df
 .coalesce(1)  # Ensure only one part file is created
 .write
 .mode("overwrite")
 .option("header", "true")
 .csv(path))


In [None]:
# List the files in the directory
files = dbutils.fs.ls(path)

# Search for the file with the part-00000 prefix
part_file = next((f.path for f in files if f.name.startswith("part-00000")), None)

# Ensure a matching file was found
if part_file:
    # Define the destination path
    destination = f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/transformed_files/ai_jobs_transformed_{datetime_now}.csv"
    
    # Move the file to the desired destination with the new name
    dbutils.fs.mv(part_file, destination)
    print(f"Processed file {part_file} moved to {destination}")
else:
    print("No file with prefix 'part-00000' found.")

Processed file wasbs://aijobs@storageacctszymon.blob.core.windows.net/logs/2024-08-23T-12-15-55/part-00000-tid-5076326388662699174-de2f9b32-e504-4eaa-81b7-d99ac5e2a1aa-6-1-c000.csv moved to wasbs://aijobs@storageacctszymon.blob.core.windows.net/transformed_files/ai_jobs_transformed_2024-08-23T-12-15-55.csv


In [None]:
# Construct the BlobServiceClient using the SAS token
blob_service_client = BlobServiceClient(account_url=f"https://{storage_name}.blob.core.windows.net", credential=sas_token)
container_client = blob_service_client.get_container_client(container_name)

# List all CSV files in the source folder
blobs = container_client.list_blobs(name_starts_with=source_folder + '/')

csv_files = []

# Print and validate blob names
for blob in blobs:
    if 'processed' not in str(blob.name):
        csv_files.append(blob.name)

# Get the only CSV file
csv_file = csv_files[0]

# Print source file path for debugging  
print(f"Source file: {csv_file}")

processed_df_count = df.count()

x_percent = 50

if (processed_df_count / raw_df_count)*100 < x_percent:
    print(f'Reconciliation error: Over {100 - x_percent} percent of rows got removed in processing. ({raw_df_count - processed_df_count} out of {raw_df_count}) Moving file to not_processed folder & aborting job')
    # Destination path in the subfolder 
    destination_blob_name = f"{source_folder}/raw_not_processed/{csv_file.split('/')[-1]}"

    # Copy the file to the destination
    source_blob_client = container_client.get_blob_client(blob=csv_file)
    destination_blob_client = container_client.get_blob_client(blob=destination_blob_name)

    # Start the copy operation
    destination_blob_client.start_copy_from_url(source_blob_client.url)

    # After the copy completes, delete the source file (optional)
    container_client.delete_blob(csv_file)
    raise SystemExit(f"File Not Processed. File '{csv_file}' moved to '{destination_blob_name}'. Job aborted.")
else:
    # Destination path in the subfolder
    destination_blob_name = f"{source_folder}/raw_processed/{csv_file.split('/')[-1]}"

    # Copy the file to the destination
    source_blob_client = container_client.get_blob_client(blob=csv_file)
    destination_blob_client = container_client.get_blob_client(blob=destination_blob_name)

    # Start the copy operation
    destination_blob_client.start_copy_from_url(source_blob_client.url)

    # After the copy completes, delete the source file (optional)
    container_client.delete_blob(csv_file)

    print(f"Processing Completed. File '{csv_file}' moved to '{destination_blob_name}'. Job completed.")

Source file: source_files/jobs_combined.csv
Processing Completed. File 'source_files/jobs_combined.csv' moved to 'source_files/raw_processed/jobs_combined.csv'. Job completed.
