In [None]:
!pip install azure-storage-blob

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, row_number
from pyspark.sql.window import Window

In [None]:
from azure.storage.blob import BlobServiceClient

PART 1: IMPORT DATA

In [None]:
# Databricks notebook source
# input
blob_account_name = ""
blob_container_name = ""
blob_relative_path = "" 
blob_relative_path_out = ""
blob_sas_token = r""

In [None]:
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasbs_path)

In [None]:
df = spark.read.option("header", "true").csv(wasbs_path)
print('Register the DataFrame as a SQL temporary view: source')
df.createOrReplaceTempView('source')

PART 2 : DO SOME TRANSFORMATIONS

In [None]:
# Filter repositories with more than 1000 stars
filtered_df = df.filter(df.stars > 1000)

# Calculate a new column for the ratio of forks to stars # df1 --> df_with_ratio
df_with_ratio = filtered_df.withColumn("forks_to_stars_ratio", col("forks") / col("stars"))

# Group by language and calculate average stars and forks # df2 --> agg_df
agg_df = df.groupBy("language").agg(avg("stars").alias("avg_stars"), avg("forks").alias("avg_forks"))

# Rank repositories within each language group based on stars # df3 --> ranked_within_group_df
windowSpec = Window.partitionBy("language").orderBy(desc("stars"))
ranked_within_group_df = df.withColumn("rank_within_group", row_number().over(windowSpec))

PART 3: WRITE BACK

In [None]:
# Define blob details
account_name = ""
container_name = ""
connection_string =""

# Create a BlobServiceClient using the Azure Storage connection string
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

In [None]:
# Convert DataFrames to CSV content
filtered_content = filtered_df.toPandas().to_csv(index=False)
ratio_content = df_with_ratio.toPandas().to_csv(index=False)
agg_content = agg_df.toPandas().to_csv(index=False)
ranked_content = ranked_within_group_df.toPandas().to_csv(index=False)

In [None]:
# Upload the content back to the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob="FilteredDataFrame.csv")
blob_client.upload_blob(filtered_content, overwrite=True)

blob_client = blob_service_client.get_blob_client(container=container_name, blob="DataFrameRatio.csv")
blob_client.upload_blob(ratio_content, overwrite=True)

blob_client = blob_service_client.get_blob_client(container=container_name, blob="AggregatedDataFrame.csv")
blob_client.upload_blob(agg_content, overwrite=True)

blob_client = blob_service_client.get_blob_client(container=container_name, blob="RankedDataFrame.csv")
blob_client.upload_blob(ranked_content, overwrite=True)

print("Content uploaded to the blob.")