In [None]:
# Install necessary packages
%pip install azure-storage-blob pandas pyarrow

In [0]:
# At the beginning of your notebook, add:
%pip install python-dotenv
%python

import os
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient
import pandas as pd
from io import BytesIO

# Load environment variables
load_dotenv()

# Azure Blob Storage details
account_name = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
account_key = os.getenv('AZURE_STORAGE_ACCOUNT_KEY')
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')

# Blob service client
blob_service_client = BlobServiceClient(
    f"https://{account_name}.blob.core.windows.net",
    credential=account_key
)

# Get the container client
container_client = blob_service_client.get_container_client(container_name)

# Directories for Bronze (input) and Silver (output) folders
bronze_folder = "Bronze/"
silver_folder = "Silver/"

# Filters with specific suffixes for "teams"
filters = {
    "matches": None,
    "standings": None,
    "scorers": None,
    "teams": ["Team", "Squad", "RunningCompitition"]  # Suffixes for "teams" files
}

# Function to get blobs by prefix
def get_blobs_by_prefix(prefix, suffix=None):
    blobs = [blob.name for blob in container_client.list_blobs(name_starts_with=bronze_folder + prefix)]
    if suffix:
        # Filter blobs that end with the specified suffix
        blobs = [blob for blob in blobs if blob.endswith(f"_{suffix}.parquet")]
    return blobs

# Function to download and merge files
def merge_files(filter_name, suffix=None):
    if suffix:
        print(f"Processing filter '{filter_name}' with suffix '{suffix}'...")
        file_paths = get_blobs_by_prefix(filter_name, suffix)
        output_suffix = f"_{suffix}"
    else:
        print(f"Processing filter '{filter_name}'...")
        file_paths = get_blobs_by_prefix(filter_name)
        output_suffix = ""

    dfs = []
    
    for file_path in file_paths:
        blob_client = container_client.get_blob_client(file_path)
        # Download the blob
        blob_data = blob_client.download_blob().readall()
        # Read it into a Pandas DataFrame
        df = pd.read_parquet(BytesIO(blob_data))
        dfs.append(df)
    
    # Merge all DataFrames
    if dfs:
        merged_df = pd.concat(dfs, ignore_index=True)
        # Save the merged file back to Azure Blob Storage
        output_file = f"{silver_folder}{filter_name}{output_suffix}_merged.parquet"
        output_data = BytesIO()
        merged_df.to_parquet(output_data, index=False)
        output_data.seek(0)
        
        # Upload the merged file
        merged_blob_client = container_client.get_blob_client(output_file)
        merged_blob_client.upload_blob(output_data, overwrite=True)
        print(f"Merged file saved as: {output_file}")
    else:
        print(f"No files found for filter: {filter_name}{output_suffix}")

# Merge files for each filter
for filter_name, suffixes in filters.items():
    if suffixes:
        # Handle specific suffixes for the filter
        for suffix in suffixes:
            merge_files(filter_name, suffix)
    else:
        # Handle filters without specific suffixes
        merge_files(filter_name)


Processing filter 'matches'...
Merged file saved as: Silver/matches_merged.parquet
Processing filter 'standings'...
Merged file saved as: Silver/standings_merged.parquet
Processing filter 'scorers'...
Merged file saved as: Silver/scorers_merged.parquet
Processing filter 'teams' with suffix 'Team'...
Merged file saved as: Silver/teams_Team_merged.parquet
Processing filter 'teams' with suffix 'Squad'...
Merged file saved as: Silver/teams_Squad_merged.parquet
Processing filter 'teams' with suffix 'RunningCompitition'...
Merged file saved as: Silver/teams_RunningCompitition_merged.parquet


In [0]:
from azure.storage.blob import BlobServiceClient
from io import BytesIO

# Azure Blob Storage details
account_name = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
account_key = os.getenv('AZURE_STORAGE_ACCOUNT_KEY')
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')

# Blob service client
blob_service_client = BlobServiceClient(
    f"https://{account_name}.blob.core.windows.net",
    credential=account_key
)

# File paths in Silver folder
file_paths = [
    "Silver/matches_merged.parquet",
    "Silver/scorers_merged.parquet",
    "Silver/standings_merged.parquet",
    "Silver/teams_RunningCompitition_merged.parquet",
    "Silver/teams_Squad_merged.parquet",
    "Silver/teams_Team_merged.parquet"
]

# Get the Blob Client
container_client = blob_service_client.get_container_client(container_name)

# Function to load and display DataFrame
def load_and_display(file_path):
    blob_client = container_client.get_blob_client(file_path)
    blob_data = blob_client.download_blob().readall()
    df = pd.read_parquet(BytesIO(blob_data))
    display(df)
    return df

# Load and display each DataFrame
df_matches = load_and_display(file_paths[0])
df_scorers = load_and_display(file_paths[1])
df_standings = load_and_display(file_paths[2])
df_teams_running_competition = load_and_display(file_paths[3])
df_teams_squad = load_and_display(file_paths[4])
df_teams_team = load_and_display(file_paths[5])

In [0]:
df_teams_running_competition = load_and_display(file_paths[3]).drop_duplicates(subset=df_teams_running_competition.columns.difference(['index']))
df_teams_running_competition = df_teams_running_competition.set_index('id')
df_teams_running_competition = df_teams_running_competition.reset_index()
display(df_teams_running_competition)

In [0]:
%python
# Assuming df_teams_team is a pandas DataFrame

# 1. Delete specified columns by index
columns_to_drop = [4, 6, 8, 11, 16,19]
df_teams_team = df_teams_team.drop(df_teams_team.columns[columns_to_drop], axis=1)

# Define functions for the transformations
def extract_id(x):
    return x['id'] if x is not None else None

def extract_season(x):
    return x['season'] if x is not None else None

def extract_ids(x):
    return [i['id'] for i in x] if x is not None else []

# 2. Extract 'id' from 'competition' column and rename to 'competition_id'
df_teams_team['competition_id'] = df_teams_team['competition'].apply(extract_id)
df_teams_team = df_teams_team.drop(columns=['competition'])

# 3. Rename 'filters' column to 'Season' and extract the season number
df_teams_team['Season'] = df_teams_team['filters'].apply(extract_season)
df_teams_team = df_teams_team.drop(columns=['filters'])

# 4. Extract 'id' from 'area' column
df_teams_team['area_id'] = df_teams_team['area'].apply(extract_id)
df_teams_team = df_teams_team.drop(columns=['area'])

# 5. Extract 'id' values from 'runningCompetitions' array of dictionaries
df_teams_team['runningCompetitions_ids'] = df_teams_team['runningCompetitions'].apply(extract_ids)
df_teams_team = df_teams_team.drop(columns=['runningCompetitions'])

display(df_teams_team)


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
%python
%pip install fsspec

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
# Set up the Azure Blob Storage account details
account_name = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
account_key = os.getenv('AZURE_STORAGE_ACCOUNT_KEY')
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')

# Configure Spark to access Azure Blob Storage
spark.conf.set(f"fs.azure.account.key.{account_name}.blob.core.windows.net", account_key)

# Extract dictionary values from 'coach' column and create separate columns for each key
df_season_details = df_teams_team['season'].apply(pd.Series)
df_season_details['team_id'] = df_teams_team['id']
df_season_details['Season'] = df_teams_team['Season']

df_season_details = df_season_details.drop(columns=['winner'])

# Assuming df_season_details is a pandas DataFrame, convert it to a PySpark DataFrame
df_season_details_spark = spark.createDataFrame(df_season_details)

# Write the PySpark DataFrame to Azure Blob Storage as a single file with a specific name
df_season_details_spark.coalesce(1).write.mode("overwrite").parquet(
    f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_season_details_temp"
)

# Rename the file to the specific name
file_system = dbutils.fs
files = file_system.ls(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_season_details_temp")
for file in files:
    if file.path.endswith(".parquet"):
        file_system.mv(file.path, f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_season_details.parquet")

# To display the DataFrame, you can still use display()
display(df_season_details_spark)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
# Function to rename columns by removing the prefix
def rename_columns(df, prefix):
    new_columns = {col: col.replace(prefix, '') for col in df.columns if col.startswith(prefix)}
    df = df.rename(columns=new_columns)
    return df

# Load, rename columns, and display DataFrame for matches
df_matches = load_and_display(file_paths[0])
columns_to_drop = [
    'flag', "matches'][0]['competition_emblem", "matches'][0]['homeTeam_crest", 
    "matches'][0]['awayTeam_crest", 'msg', 'count', 'first', 'last', 
    'competition_id', 'competition_name', 'competition_code', 
    'competition_type', 'competition_emblem'
]
df_matches = df_matches.drop(columns=columns_to_drop)
df_matches = rename_columns(df_matches, "matches'][0]['")
display(df_matches)

# Convert to PySpark DataFrame
df_matches_spark = spark.createDataFrame(df_matches)

# Write the PySpark DataFrame to Azure Blob Storage as a single file with a specific name
df_matches_spark.coalesce(1).write.mode("overwrite").parquet(
    f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_matches_temp"
)

# Rename the file to the specific name
files = dbutils.fs.ls(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_matches_temp")
for file in files:
    if file.path.endswith(".parquet"):
        dbutils.fs.mv(file.path, f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_matches.parquet")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
# Extract dictionary values from 'coach' column and create separate columns for each key
df_coach_details = df_teams_team['coach'].apply(pd.Series)
df_coach_details['team_id'] = df_teams_team['id']
df_coach_details['Season'] = df_teams_team['Season']
df_coach_details['TeamName'] = df_teams_team['name']

# Explode 'contract' column
df_contract_details = df_coach_details['contract'].apply(pd.Series)
df_coach_details = pd.concat([df_coach_details.drop(columns=['contract']), df_contract_details], axis=1)

# Delete any row with null 'name'
df_coach_details = df_coach_details.dropna(subset=['name'])

# Reorder columns
columns_order = ['id', 'name', 'firstName', 'lastName', 'dateOfBirth'] + [col for col in df_coach_details.columns if col not in ['id', 'name', 'firstName', 'lastName', 'dateOfBirth']]
df_coach_details = df_coach_details[columns_order]

# Convert to PySpark DataFrame
df_coach_details_spark = spark.createDataFrame(df_coach_details)

# Write the PySpark DataFrame to Azure Blob Storage as a single file with a specific name
df_coach_details_spark.coalesce(1).write.mode("overwrite").parquet(
    f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_coach_details_temp"
)

# Rename the file to the specific name
files = file_system.ls(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_coach_details_temp")
for file in files:
    if file.path.endswith(".parquet"):
        file_system.mv(file.path, f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_coach_details.parquet")

display(df_coach_details_spark)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
# Extract dictionary values from 'squad' column and create separate columns for each key
df_squad_details = df_teams_team['squad'].explode().apply(pd.Series)
df_squad_details['team_id'] = df_teams_team['id']
df_squad_details['Season'] = df_teams_team['Season']
df_squad_details['TeamName'] = df_teams_team['name']

# Reorder columns to have 'id' and 'name' in the first positions
columns_order = ['id', 'name'] + [col for col in df_squad_details.columns if col not in ['id', 'name']]
df_squad_details = df_squad_details[columns_order]

# Convert to PySpark DataFrame
df_squad_details_spark = spark.createDataFrame(df_squad_details)

# Write the PySpark DataFrame to Azure Blob Storage as a single file with a specific name
df_squad_details_spark.coalesce(1).write.mode("overwrite").parquet(
    f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_squad_details_temp"
)

# Rename the file to the specific name
files = file_system.ls(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_squad_details_temp")
for file in files:
    if file.path.endswith(".parquet"):
        file_system.mv(file.path, f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_squad_details.parquet")

display(df_squad_details_spark)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage

In [0]:
df_teams_team['season_id'] = df_teams_team['season'].apply(lambda x: x['id'] if isinstance(x, dict) else None)
df_teams_team['coach_id'] = df_teams_team['coach'].apply(lambda x: x['id'] if isinstance(x, dict) else None)
df_teams_team = df_teams_team.drop(columns=['squad', 'coach', 'season'])
columns_order = ['id', 'Season', 'name'] + [col for col in df_teams_team.columns if col not in ['id', 'Season', 'name']]
df_teams_team = df_teams_team[columns_order]

# Convert to PySpark DataFrame
df_teams_team_spark = spark.createDataFrame(df_teams_team)

# Write the PySpark DataFrame to Azure Blob Storage as a single file with a specific name
df_teams_team_spark.coalesce(1).write.mode("overwrite").parquet(
    f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_teams_team_temp"
)

# Rename the file to the specific name
files = file_system.ls(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_teams_team_temp")
for file in files:
    if file.path.endswith(".parquet"):
        file_system.mv(file.path, f"wasbs://{container_name}@{account_name}.blob.core.windows.net/Silver/df_teams_team.parquet")

display(df_teams_team_spark)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1311)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:1028)
	at com.databricks.logging.Usage