In [0]:
import json

!pip install pandas pyarrow fastparquet

Collecting fastparquet
  Using cached fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting cramjam>=2.3 (from fastparquet)
  Using cached cramjam-2.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Using cached fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
Using cached cramjam-2.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB)
Installing collected packages: cramjam, fastparquet
Successfully installed cramjam-2.9.0 fastparquet-2024.11.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%python
import os
import pandas as pd
from azure.storage.blob import BlobServiceClient

# Initialize the Blob Service Client
connection_string = "account key"
container_name = "group2"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)



# Function for safe JSON parsing
def safe_json_loads(x):
    try:
        return json.loads(x) if isinstance(x, str) else x
    except json.JSONDecodeError:
        return None  # Return None for invalid JSON strings


input_folder = "Decoded data in parquet format/JuneMachineData_decoded"
output_folder = "Normalized data in parquet format/JuneMachineData_normalized"


# Iterate over all files in the input folder
blobs = container_client.list_blobs(name_starts_with=input_folder)

for blob in blobs:
    blob_name = blob.name
    print(f"Processing blob: {blob_name}")
    
    try:
        # Download the blob to a temporary file
        temp_file_path = "temp_file.parquet"
        normalized_file_path = None  # Initialize normalized_file_path

        with open(temp_file_path, "wb") as temp_file:
            blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
            temp_file.write(blob_client.download_blob().readall())

        # Read the Parquet file into a Pandas DataFrame
        df = pd.read_parquet(temp_file_path, engine='fastparquet')

        # Safely parse the 'BodyContent' column and flatten it
        df['BodyContent'] = df['BodyContent'].apply(safe_json_loads)
        body_content_df = pd.json_normalize(df['BodyContent'].explode())
        df_flattened = df.drop(['BodyContent'], axis=1).join(body_content_df)

        # Save the normalized DataFrame locally in Parquet format
        normalized_file_path = f"normalized_{os.path.basename(blob_name)}"
        df_flattened.to_parquet(normalized_file_path, index=False)

        # Define the output blob name
        output_blob_name = f"{output_folder}/{os.path.basename(normalized_file_path)}"

        # Upload the normalized file to the output folder in Blob Storage
        with open(normalized_file_path, "rb") as data:
            container_client.upload_blob(name=output_blob_name, data=data, overwrite=True)

        print(f"File '{normalized_file_path}' successfully uploaded to '{output_blob_name}'.")

    except Exception as e:
        print(f"Error processing {blob_name}: {e}")
    
    finally:
        # Clean up temporary files
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)
        if normalized_file_path and os.path.exists(normalized_file_path):
            os.remove(normalized_file_path)

print("Processing completed.")

Processing blob: Decoded data in parquet format/JuneMachineData_decoded
Error processing Decoded data in parquet format/JuneMachineData_decoded: [Errno 22] Invalid argument
Processing blob: Decoded data in parquet format/JuneMachineData_decoded/June1.parquet
File 'normalized_June1.parquet' successfully uploaded to 'Normalized data in parquet format/JuneMachineData_normalized/normalized_June1.parquet'.
Processing blob: Decoded data in parquet format/JuneMachineData_decoded/June10.parquet
File 'normalized_June10.parquet' successfully uploaded to 'Normalized data in parquet format/JuneMachineData_normalized/normalized_June10.parquet'.
Processing blob: Decoded data in parquet format/JuneMachineData_decoded/June100.parquet
File 'normalized_June100.parquet' successfully uploaded to 'Normalized data in parquet format/JuneMachineData_normalized/normalized_June100.parquet'.
Processing blob: Decoded data in parquet format/JuneMachineData_decoded/June101.parquet
Error processing Decoded data in p