In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName("Deals").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# Load the CSV file into a Spark DataFrame
deals = spark.read.csv("/content/Deals.csv", header=True, inferSchema=True)

In [12]:
# PANDAS
# deals = deals.dropna(how='all')
# deals.columns = deals.columns.str.lower().str.replace(' ', '_')

# SPARK
def basic_clean(df):
    for col in df.columns:
        df = df.withColumnRenamed(col, col.lower().replace(' ', '_'))
    return df

deals = basic_clean(deals)

In [13]:
# PANDAS
# deals.loc[deals['loan_rate'] > 100, 'loan_rate'] = np.nan

# SPARK
def fix_loan_rate(df):
    return df.withColumn(
        'loan_rate',
        when(col('loan_rate') > 100, None).otherwise(col('loan_rate'))
    )

deals = fix_loan_rate(deals)

In [14]:
# PANDAS
# deals.dropna(subset=['customer_id', 'broker_id', 'property_id'], inplace=True)

# SPARK
def drop_missing_ids(df):
    return df.dropna(subset=['customer_id', 'broker_id', 'property_id'])

deals = drop_missing_ids(deals)

In [15]:
# PANDAS
# deals.to_csv('clean_deals.csv', index=False)

# SPARK
def save_csv(df, path):
    df.coalesce(1).write.mode("overwrite").csv(path, header=True)

save_csv(deals, 'clean_deals.csv')