# Fixed-Length File Processing using Databricks & PySpark

This notebook demonstrates how to ingest and process fixed-length files using
PySpark on Databricks. The solution reflects a real-world data engineering use
case where legacy systems generate fixed-width files that must be transformed
into structured datasets.


## Notebook Workflow

This notebook performs the following steps:
1. Read raw fixed-length file from Databricks File System (DBFS)
2. Define column positions and schema
3. Parse raw records into structured format
4. Apply data cleansing and transformations
5. Validate and display the final output


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

## Input Data Description

The input file is a fixed-length text file stored in DBFS.
Each record contains multiple fields with predefined start and end positions.
Since the file has no delimiters, column boundaries must be explicitly defined
before parsing.


In [0]:
json_path = "/Volumes/practice/default/practice/fixedlength_json.json"
csv_path = "/Volumes/practice/default/practice/FixLengthFile.csv"

## Step 1: Read Fixed-Length File from DBFS

The file is read as raw text because fixed-length files do not contain
delimiters. Each row represents a single record that will be parsed
based on column positions.


In [0]:
df_json = spark.read.format("json").option("multiline",True).load(json_path)

df_csv = spark.read.format("csv").load(csv_path)

In [0]:
df_csv.display()

## Step 2: Define Column Positions and Schema

Column positions are defined based on the fixed-width specification.
Each field is extracted using substring logic and mapped to a
meaningful column name and data type.

In [0]:
schema_data = [
    {"name" : "pid", "Type":"String", "prelength": 0, "length": 4},
    {"name" : "pname", "Type":"String", "prelength": 4, "length": 12},
    {"name" : "quantity", "Type":"String", "prelength": 12, "length": 15},
    {"name" : "amount", "Type":"String", "prelength": 18, "length": 23}
]

## Step 3: Convert DataFrame to RDD for Row-Level Processing

At this stage, the DataFrame is converted into an RDD to enable
low-level, record-by-record processing.

Fixed-length file parsing requires direct access to raw string
records so that individual character positions can be extracted
accurately. Converting the DataFrame to an RDD provides the
flexibility needed for such custom parsing logic.

### Data Transformation Explanation

- The DataFrame contains a single column with raw fixed-length records
- Calling `.rdd` converts the DataFrame into an RDD of Row objects
- The `map(lambda x: x[0])` extracts the raw string value from each Row

After this step, the data becomes an RDD where each element represents
one complete fixed-length record as a string.


In [0]:
rdd_lines = df_csv.rdd.map(lambda x : x[0])

rdd_lines.collect()

## Step 4: Enumerate Fixed-Length Schema Metadata

This step iterates over the schema definition that describes the
fixed-length file structure.

Each schema entry contains metadata such as column start position
and length. Enumerating the schema allows us to track column order
and verify field boundaries before applying extraction logic.


### Why Schema Enumeration Is Required

Enumerating the schema ensures:
- Correct column sequencing during extraction
- Validation of start and end positions
- Alignment between raw data and target schema

This step acts as a safeguard to prevent incorrect slicing of
fixed-length records.


In [0]:
for i in enumerate(schema_data):
    print(i)

## Step 5: Extract Fields from Fixed-Length Records

In this step, each raw fixed-length record is parsed into individual
fields using schema-defined character positions.

A custom mapping function is applied to extract substrings from
each record and assemble them into a structured tuple.

### Field Extraction Logic Explained

- Each RDD element represents a single raw text record
- The schema defines `prelength` (start position) and `length` (end position)
- Substrings are extracted using character slicing
- Extracted values are combined into a tuple representing one structured row

The output of this step is an RDD of tuples, where each tuple
corresponds to a parsed record aligned with the target schema.


In [0]:
rdd_final = rdd_lines.map(
    lambda p : tuple(p[z["prelength"] : z["length"]]
        for i , z in enumerate(schema_data)
))

## Step 6: Build Spark Schema from Fixed-Length Metadata

In this step, a Spark `StructType` schema is dynamically constructed
using the fixed-length file metadata.

Each column name defined in the schema configuration is mapped to a
Spark `StructField`. This approach ensures that the final DataFrame
has well-defined column names and a consistent structure.

### Why Schema Construction Is Required

After parsing fixed-length records into tuples, the data lacks
column names and data types. Defining an explicit schema:

- Improves data readability and usability
- Enforces structural consistency
- Enables downstream validation and transformations
- Alignes the output with analytics and reporting requirements

### Schema Construction Logic Explained

- Iterates over the schema metadata
- Creates a `StructField` for each column name
- Assigns `StringType` to maintain flexibility during ingestion
- Combines all fields into a `StructType` schema

This schema will be applied when converting the RDD into a
structured Spark DataFrame.



In [0]:
schema_field = []

for i in schema_data:
    schema_field.append(StructField(i["name"], StringType(), True))

final_schema = StructType(schema_field)
print(final_schema)

## Step 7: Convert Parsed RDD to Structured DataFrame

The parsed RDD of tuples is converted into a Spark DataFrame by
applying the previously defined schema.

This transformation transitions the data from low-level RDDs to
high-level DataFrames, enabling optimized Spark execution and
SQL-based analytics.


In [0]:
rdd_final.toDF(schema = final_schema).display()

### Final Data Transformation Flow

- Input: RDD[Tuple] representing parsed fixed-length records
- Schema Applied: StructType with meaningful column names
- Output: Spark DataFrame ready for analytics and downstream pipelines
