In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder\
    .appName("DataFrame Heading Cleaning and Renaming")\
        .master("local[2]")\
            .config("spark.executor.memory", "2g")\
                .config("spark.driver.memory", "1g")\
                    .config("spark.sql.shuffle.partitions", "4")\
                        .getOrCreate()

23/10/15 03:35:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Define the options and configurations
options = {
    "header": "true",          # Use the first row as the header
    "inferSchema": "true",     # Infer the schema of the CSV file
    "sep": ",",                # Specify the separator (default is ',')
    "encoding": "UTF-8",       # Set the character encoding
    "mode": "DROPMALFORMED"    # Handle malformed rows by dropping them
}

In [6]:
csv_file_path = 'input-data/person_data.csv'

In [23]:
# Read the CSV file with the specified options
df = spark.read.csv(csv_file_path, **options)
df.printSchema()

root
 |-- Person@ ID: integer (nullable = true)
 |-- First, Name: string (nullable = true)
 |-- Last! Name: string (nullable = true)
 |-- Date* Of Birth: date (nullable = true)
 |-- Gender#: string (nullable = true)
 |-- Email$ Address: string (nullable = true)
 |-- Phone% Number: long (nullable = true)
 |-- Address^: string (nullable = true)
 |-- City (State): string (nullable = true)
 |-- Zip Code|: integer (nullable = true)
 |-- Country~: string (nullable = true)
 |-- Nationality`: string (nullable = true)
 |-- Occupation_: string (nullable = true)
 |-- Education+: string (nullable = true)
 |-- Marital-Status: string (nullable = true)
 |-- Registration. Date: date (nullable = true)
 |-- Registration TIme: timestamp (nullable = true)



In [24]:
import re
# Define a custom function to clean column names
def clean_column_name(name):
    # Remove special characters, spaces, and convert to lowercase
    cleaned_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name).strip('_').lower()
    return cleaned_name

# Apply the custom function to rename columns
for col in df.columns:
    new_col_name = clean_column_name(col)
    df = df.withColumnRenamed(col, new_col_name)

# Remove leading and trailing underscores from column names
def remove_leading_trailing_underscores_from_column_names(df, leading_trailing_underscores=True):
    for col in df.columns:
        if leading_trailing_underscores:
            df = df.withColumnRenamed(col, col.strip('_'))
    return df

df = remove_leading_trailing_underscores_from_column_names(df)

# Remove leading and trailing spaces from column names
def remove_leading_trailing_spaces_from_column_names(df, leading_trailing_spaces=True):
    for col in df.columns:
        if leading_trailing_spaces:
            df = df.withColumnRenamed(col, col.strip())
    return df

df = remove_leading_trailing_spaces_from_column_names(df)

# Join words with underscores in column names
def join_words_with_spaces_in_column_names(df, join_by_space=True):
    for col in df.columns:
        if join_by_space:
            df = df.withColumnRenamed(col, "_".join(col.split()))
    return df

df = join_words_with_spaces_in_column_names(df)

# Convert column names to lowercase
def lowercase_dataframe_column_names(df, lower_case=True):
    for col in df.columns:
        if lower_case:
            df = df.withColumnRenamed(col, col.lower())
    return df

df = lowercase_dataframe_column_names(df)

# Show the DataFrame
df.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- phone_number: long (nullable = true)
 |-- address: string (nullable = true)
 |-- city_state: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- education: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- registration_time: timestamp (nullable = true)



In [25]:
# Read the CSV file with the specified options
df = spark.read.csv(csv_file_path, **options)
df.printSchema()

root
 |-- Person@ ID: integer (nullable = true)
 |-- First, Name: string (nullable = true)
 |-- Last! Name: string (nullable = true)
 |-- Date* Of Birth: date (nullable = true)
 |-- Gender#: string (nullable = true)
 |-- Email$ Address: string (nullable = true)
 |-- Phone% Number: long (nullable = true)
 |-- Address^: string (nullable = true)
 |-- City (State): string (nullable = true)
 |-- Zip Code|: integer (nullable = true)
 |-- Country~: string (nullable = true)
 |-- Nationality`: string (nullable = true)
 |-- Occupation_: string (nullable = true)
 |-- Education+: string (nullable = true)
 |-- Marital-Status: string (nullable = true)
 |-- Registration. Date: date (nullable = true)
 |-- Registration TIme: timestamp (nullable = true)



In [26]:
import re
# Define a custom function to clean column names
def clean_and_transform_column_names(
        df,
        remove_special_chars=True, 
        remove_underscores=True,
        remove_spaces=True, 
        join_with_underscores=True, 
        lowercase=True
        ):
    for col in df.columns:
        new_col_name = col
        # Remove special characters and replace spaces with ""
        if remove_special_chars:
            new_col_name = re.sub(r'[^a-zA-Z0-9]+', ' ', new_col_name)
        # Remove leading and trailing underscores
        if remove_underscores:
            new_col_name = new_col_name.strip('_')
        # Remove leading and trailing spaces
        if remove_spaces:
            new_col_name = new_col_name.strip()
        # Join words with underscores
        if join_with_underscores:
            new_col_name = "_".join(new_col_name.split())
        # Convert to lowercase
        if lowercase:
            new_col_name = new_col_name.lower()
        # Rename the column
        df = df.withColumnRenamed(col, new_col_name)
    return df

# Clean and transform column names using a single function
df = clean_and_transform_column_names(df)

# Show the DataFrame Schema
df.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- phone_number: long (nullable = true)
 |-- address: string (nullable = true)
 |-- city_state: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- education: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- registration_time: timestamp (nullable = true)

