In [0]:
csv_path = 'dbfs:/FileStore/data_profiling.csv'
json_path = 'dbfs:/FileStore/json_data_profiling.json'

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
schema = StructType([
    StructField("Customer_ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Age", StringType(), True),  
    StructField("Purchase_Amount", StringType(), True),  
    StructField("Signup_Date", StringType(), True) 
])


In [0]:
df = spark.read.csv(csv_path,schema=schema, header = True)

In [0]:
df.display()


Customer_ID,Name,Email,Age,Purchase_Amount,Signup_Date
1,Alice,alice@example.com,25,100.0,2024-01-10
2,Bob,bob@example.com,,500.5,01-12-2024
3,Charlie,charlie@domain,30,,2024/02/15
4,David,,28,300.0,05/03/2024
5,Eve,eve@example.com,Twenty-Two,700.75,2024-01-25
1,Alice,alice@example.com,25,100.0,2024-01-10
7,John,JOHN@GMAIL.COM,40,50.5,Feb 20
8,Mike,mike@example.com,35,,2024-01-30


In [0]:
# in age column replacing non numerics with empty space
# in Purchase_Amount column replacing NA with None oterwise casting it to double type
# in Signup_Date column converting values from string type to date type and formatting it
# in Email column removing white spaces from both the ends and converting it to lower case


df_fixed = df.withColumn("Age", regexp_replace(col("Age"), "[^0-9]", "").cast(IntegerType())) \
             .withColumn("Purchase_Amount", when(col("Purchase_Amount") == "NA", lit(None)).otherwise(col("Purchase_Amount")).cast(DoubleType())) \
             .withColumn("Signup_Date", to_date(regexp_replace(trim(col("Signup_Date")), "/", "-"), "yyyy-MM-dd"))\
                 .withColumn("Email", lower(trim((col("Email")))))


In [0]:
df_fixed.display()

Customer_ID,Name,Email,Age,Purchase_Amount,Signup_Date
1,Alice,alice@example.com,25.0,100.0,2024-01-10
2,Bob,bob@example.com,,500.5,
3,Charlie,charlie@domain,30.0,,2024-02-15
4,David,none,28.0,300.0,
5,Eve,eve@example.com,,700.75,2024-01-25
1,Alice,alice@example.com,25.0,100.0,2024-01-10
7,John,john@gmail.com,40.0,50.5,
8,Mike,mike@example.com,35.0,,2024-01-30


In [0]:
# counting null values for each column
df_fixed.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).display()

Customer_ID,Name,Email,Age,Purchase_Amount,Signup_Date
0,0,0,2,2,3


In [0]:
df_fixed.printSchema()


root
 |-- Customer_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Purchase_Amount: double (nullable = true)
 |-- Signup_Date: date (nullable = true)



In [0]:
final_df = df_fixed.distinct()
final_df.orderBy(col('Customer_ID')).display()

Customer_ID,Name,Email,Age,Purchase_Amount,Signup_Date
1,Alice,alice@example.com,25.0,100.0,2024-01-10
2,Bob,bob@example.com,,500.5,
3,Charlie,charlie@domain,30.0,,2024-02-15
4,David,none,28.0,300.0,
5,Eve,eve@example.com,,700.75,2024-01-25
7,John,john@gmail.com,40.0,50.5,
8,Mike,mike@example.com,35.0,,2024-01-30


In [0]:
json_df = spark.read.json(json_path, multiLine=True)

In [0]:
json_df.display()

address,age,email,id,name,orders
"List(Springfield, 123 Elm St, 12345)",30,john.doe@example.com,1,John Doe,"List(List(1200.5, 2022-01-15, 101, Laptop), List(800.99, 2022-03-22, 102, Smartphone))"
"List(Metropolis, 456 Oak St, 67890)",27,jane.smith@example.com,2,Jane Smith,"List(List(499.99, 2022-04-10, 103, Tablet))"
"List(Gotham, 789 Pine St, 11223)",22,sam.brown@example.com,3,Sam Brown,"List(List(150.75, 2022-06-05, 104, Headphones), List(350.0, 2022-07-19, 105, Monitor))"


In [0]:
# exloding orders, address columns
# droping oridinal , address columns after exploding
# converting Zip column to integer type
# converting date column to date type 

exploded_df = json_df.withColumn('orders', explode('orders')).select('*', 'orders.*', 'address.*').drop('orders', 'address').withColumn('Zip', col('Zip').cast(IntegerType())).withColumn('date', to_date(col('date'),  "yyyy-MM-dd"))

In [0]:
exploded_df.display()

age,email,id,name,amount,date,order_id,product,city,street,Zip
30,john.doe@example.com,1,John Doe,1200.5,2022-01-15,101,Laptop,Springfield,123 Elm St,12345
30,john.doe@example.com,1,John Doe,800.99,2022-03-22,102,Smartphone,Springfield,123 Elm St,12345
27,jane.smith@example.com,2,Jane Smith,499.99,2022-04-10,103,Tablet,Metropolis,456 Oak St,67890
22,sam.brown@example.com,3,Sam Brown,150.75,2022-06-05,104,Headphones,Gotham,789 Pine St,11223
22,sam.brown@example.com,3,Sam Brown,350.0,2022-07-19,105,Monitor,Gotham,789 Pine St,11223


In [0]:
# counting null values for each column
exploded_df.select([count(when(col(c).isNull(), c)).alias(c) for c in exploded_df.columns]).display()

age,email,id,name,amount,date,order_id,product,city,street,Zip
0,0,0,0,0,0,0,0,0,0,0


In [0]:
exploded_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- date: date (nullable = true)
 |-- order_id: long (nullable = true)
 |-- product: string (nullable = true)
 |-- city: string (nullable = true)
 |-- street: string (nullable = true)
 |-- Zip: integer (nullable = true)

