In [0]:
# Define source and target file paths for CSV to Parquet conversion
# This cell sets the file paths used for reading the source CSV file and writing the output Parquet file.
# - sourceFileUrl: Path to the source CSV file in the workspace volume
# - targetParquetFilePath: Path to save the converted Parquet file

sourceFileUrl = "/Volumes/workspace/default/lakehouse"  # Source CSV file path
targetParquetFilePath = "/Volumes/workspace/default/lakehouse/parquet/"  # Target Parquet file path

In [0]:
# Read CSV file from the specified source path into a Spark DataFrame
# The CSV file is loaded without inferring schema and without using the first row as header
# sourceFileUrl: Path to the source CSV file in the workspace volume
# The resulting DataFrame will have default column names (_c0, _c1, etc.)

sourceCSVfileDF = (
    spark
    .read
    .load(sourceFileUrl, format="csv")  # Load CSV file; no header or schema inference
)

In [0]:
# Read CSV file from the specified source path into a Spark DataFrame
# The CSV file is loaded with the first row as header, but without inferring schema

sourceCSVfileDF = (
    spark
    .read
    .load(sourceFileUrl, format="csv", header=True)  # Load CSV file with header; no schema inference
)

In [0]:
from pyspark.sql.types import *
sourceCSVFileSchema=StructType([
  StructField("DATE_OF_PRICING", StringType(), True),
  StructField("ROW_ID", IntegerType(), True),
  StructField("STATE_NAME", StringType(), True),
  StructField("MARKET_NAME", StringType(), True),
  StructField("PRODUCTGROUP_NAME", StringType(), True),
  StructField("PRODUCT_NAME", StringType(), True),
  StructField("VARIETY", StringType(), True),
  StructField("ORIGIN", StringType(), True),
  StructField("ARRIVAL_IN_TONNES", DecimalType(10,2), True),
  StructField("MINIMUM_PRICE", StringType(), True),
  StructField("MAXIMUM_PRICE", StringType(), True),
  StructField("MODAL_PRICE", StringType(), True)
  ])

In [0]:
# Read CSV file from the specified source path into a Spark DataFrame
# The CSV file is loaded with the first row as header and schema is inferred automatically

sourceCSVfileDF = (
    spark
    .read
    .schema(sourceCSVFileSchema)
    .load(
        sourceFileUrl,           # Path to the source CSV file
        format="csv",            # Specify file format as CSV
        header=True,             # Use first row as header
        inferSchema=True         # Infer schema automatically from data
    )
)

In [0]:
from pyspark.sql.functions import col  # Import the 'col' function to reference DataFrame columns

# Create a new DataFrame by adding a column 'ARRIVAL_IN_KG' to the source DataFrame
# - 'ARRIVAL_IN_KG' is calculated by multiplying 'ARRIVAL_IN_TONNES' by 1000
# - This converts the arrival quantity from tonnes to kilograms
sourceCSVfileTransDF = (
    sourceCSVfileDF.withColumn(
        "ARRIVAL_IN_KG",              # Name of the new column
        col("ARRIVAL_IN_TONNES") * 1000  # Conversion from tonnes to kilograms
    )
)

In [0]:
# Display the transformed DataFrame with the new 'ARRIVAL_IN_KG' column
# - This shows the data after converting 'ARRIVAL_IN_TONNES' to kilograms
display(sourceCSVfileTransDF)

In [0]:
from pyspark.sql.functions import *  # Import all functions from pyspark.sql.functions for DataFrame operations

# Group the transformed DataFrame by 'PRODUCT_NAME' and 'STATE_NAME'
# - This groups the data so that arrival quantities can be aggregated for each product in each state
# Aggregate the grouped data by summing the 'ARRIVAL_IN_KG' column for each product and state
# - The sum function calculates the total arrival quantity in kilograms for each group
# Rename the aggregated column to 'TOTAL_ARRIVAL_IN_KG'
# - This provides a clear name for the result of the aggregation
# Sort the resulting DataFrame by 'TOTAL_ARRIVAL_IN_KG' in descending order
# - This orders the products and states by the highest total arrival quantity first
# Display the resulting aggregated and sorted DataFrame
display(
    sourceCSVfileTransDF
        .groupBy("PRODUCT_NAME", "STATE_NAME")  # Group by product and state
        .agg(sum("ARRIVAL_IN_KG").alias("TOTAL_ARRIVAL_IN_KG"))  # Sum arrival quantity in kilograms for each group
        .orderBy("TOTAL_ARRIVAL_IN_KG", ascending=False)  # Sort by total arrival in descending order
)

In [0]:
# Write the DataFrame 'sourceCSVfileDF' to the specified Parquet file path
# - .write: Initiates the write operation for the DataFrame
# - .mode("overwrite"): Overwrites any existing data at the target location
# - .save(targetParquetFilePath): Saves the DataFrame in Parquet format to the given path
sourceCSVfileDF.write.mode("overwrite").save(targetParquetFilePath)  # Save as Parquet file at target path
# .saveAsTable("sourceCSVfileDF") can be used to save as a managed table (commented out)