<a href="https://colab.research.google.com/github/durgaprasadpr718/DE-Task/blob/main/DE_Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install Pyspark

In [3]:
!pip install pyspark



Import necessary libraries

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, floor, datediff, current_date, to_date, when, count
from pyspark.sql.types import StructType, StructField, StringType, DateType
from datetime import datetime

Initialize Spark Session

In [10]:
spark = SparkSession.builder \
    .appName("CustomerDataProcessing") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

Create Tables by defining schema

In [11]:
# Define schema
schema = StructType([
    StructField("Customer_ID", StringType(), False),
    StructField("DOB", DateType(), False),
    StructField("Open_Date", DateType(), False),
    StructField("Last_Consulted_Date", DateType(), True),
    StructField("Country", StringType(), False),
    StructField("Post_Code", StringType(), True),
    StructField("Phone_Number", StringType(), False),
    StructField("Active_Customer", StringType(), False)
])

# Convert date strings to datetime.date objects
data = [
    ("C001", datetime.strptime("1990-06-15", "%Y-%m-%d").date(),
     datetime.strptime("2022-01-01", "%Y-%m-%d").date(),
     datetime.strptime("2023-10-15", "%Y-%m-%d").date(),
     "IND", "560001", "9876543210", "Yes"),

    ("C002", datetime.strptime("1985-09-25", "%Y-%m-%d").date(),
     datetime.strptime("2021-05-10", "%Y-%m-%d").date(),
     datetime.strptime("2023-11-01", "%Y-%m-%d").date(),
     "USA", "10001", "1234567890", "No"),

    ("C003", datetime.strptime("2000-03-10", "%Y-%m-%d").date(),
     datetime.strptime("2023-02-20", "%Y-%m-%d").date(),
     datetime.strptime("2023-09-01", "%Y-%m-%d").date(),
     "IND", "560002", "9876501234", "Yes"),

    ("C004", datetime.strptime("1995-07-20", "%Y-%m-%d").date(),
     datetime.strptime("2020-12-05", "%Y-%m-%d").date(),
     datetime.strptime("2023-07-20", "%Y-%m-%d").date(),
     "IND", "560003", "7890654321", "No")
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Show results
df.show()
df.printSchema()

+-----------+----------+----------+-------------------+-------+---------+------------+---------------+
|Customer_ID|       DOB| Open_Date|Last_Consulted_Date|Country|Post_Code|Phone_Number|Active_Customer|
+-----------+----------+----------+-------------------+-------+---------+------------+---------------+
|       C001|1990-06-15|2022-01-01|         2023-10-15|    IND|   560001|  9876543210|            Yes|
|       C002|1985-09-25|2021-05-10|         2023-11-01|    USA|    10001|  1234567890|             No|
|       C003|2000-03-10|2023-02-20|         2023-09-01|    IND|   560002|  9876501234|            Yes|
|       C004|1995-07-20|2020-12-05|         2023-07-20|    IND|   560003|  7890654321|             No|
+-----------+----------+----------+-------------------+-------+---------+------------+---------------+

root
 |-- Customer_ID: string (nullable = false)
 |-- DOB: date (nullable = false)
 |-- Open_Date: date (nullable = false)
 |-- Last_Consulted_Date: date (nullable = true)
 |-

Create the above tables with additional derived columns: age and days since last consulted >30

In [23]:
# Add 'age' column (years between DOB and current date)
df = df.withColumn("age", floor(datediff(current_date(), col("DOB")) / 365))

df_transformed = df.withColumn("Age", floor(datediff(current_date(), col("DOB")) / 365)) \
                   .withColumn("Days_Since_Last_Consulted", datediff(current_date(), col("Last_Consulted_Date")))
df_transformed.show()

# Add 'days_since_last_consulted_30' column (True if >30 days, False otherwise)
df_transformed1 = df.withColumn("days_since_last_consulted_30", when(datediff(current_date(), col("Last_Consulted_Date")) > 30, True).otherwise(False))

# Show updated DataFrame
df_transformed1.show()

+-----------+----------+----------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|Customer_ID|       DOB| Open_Date|Last_Consulted_Date|Country|Post_Code|Phone_Number|Active_Customer|Age|days_since_last_consulted_30|Days_Since_Last_Consulted|
+-----------+----------+----------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|       C001|1990-06-15|2022-01-01|         2023-10-15|    IND|   560001|  9876543210|            Yes| 34|                        true|                      472|
|       C002|1985-09-25|2021-05-10|         2023-11-01|    USA|    10001|  1234567890|             No| 39|                        true|                      455|
|       C003|2000-03-10|2023-02-20|         2023-09-01|    IND|   560002|  9876501234|            Yes| 24|                        true|                      516|
|       C004|1995-07-20|2020

**Validations**

Validate Missing Data

In [24]:
df_transformed.select([count(when(col(c).isNull(), c)).alias(c) for c in df_transformed.columns]).show()

+-----------+---+---------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|Customer_ID|DOB|Open_Date|Last_Consulted_Date|Country|Post_Code|Phone_Number|Active_Customer|Age|days_since_last_consulted_30|Days_Since_Last_Consulted|
+-----------+---+---------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|          0|  0|        0|                  0|      0|        0|           0|              0|  0|                           0|                        0|
+-----------+---+---------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+



Validate Age Calculation

In [25]:
df_transformed.select("Customer_ID", "DOB", "Age").show()

+-----------+----------+---+
|Customer_ID|       DOB|Age|
+-----------+----------+---+
|       C001|1990-06-15| 34|
|       C002|1985-09-25| 39|
|       C003|2000-03-10| 24|
|       C004|1995-07-20| 29|
+-----------+----------+---+



Validate Customers with Last Consult > 30 Days

In [26]:
df_filtered = df_transformed.filter(col("Days_Since_Last_Consulted") > 30)
df_filtered.show()

+-----------+----------+----------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|Customer_ID|       DOB| Open_Date|Last_Consulted_Date|Country|Post_Code|Phone_Number|Active_Customer|Age|days_since_last_consulted_30|Days_Since_Last_Consulted|
+-----------+----------+----------+-------------------+-------+---------+------------+---------------+---+----------------------------+-------------------------+
|       C001|1990-06-15|2022-01-01|         2023-10-15|    IND|   560001|  9876543210|            Yes| 34|                        true|                      472|
|       C002|1985-09-25|2021-05-10|         2023-11-01|    USA|    10001|  1234567890|             No| 39|                        true|                      455|
|       C003|2000-03-10|2023-02-20|         2023-09-01|    IND|   560002|  9876501234|            Yes| 24|                        true|                      516|
|       C004|1995-07-20|2020