<a name="cell-id1"></a>
##<h2>**Part 1: Data Schema Enforcement with PySpark**</h2>
I use metadata to track schema changes, keeping a table updated with current and expected schemas. When new data arrives, I'd compare it to the expected schema. If there are differences, I'd adjust the data to match, adding new fields or changing data types as needed.

Using PySpark, I'd read the metadata and data, compare schemas, transform data accordingly.

I'd consider scenarios like adding new fields or updating data types. And for this exercise, I'd use CSV data.

This approach ensures our data pipeline stays agile, ready to handle any schema changes that come our way.


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f79b7c6a358b60958d034efad9396dfc9d4c4dbc9785b719ef7e7af05f1e6d47
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
#Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DateType,FloatType
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_date

In [None]:
#Create a Spark session
spark = SparkSession.builder \
    .appName("Schema Evolution Management") \
    .getOrCreate()

In [None]:
# Define the expected schema
expected_schema = StructType([
    StructField("EmpID", IntegerType()),
    StructField("Name", StringType()),
    StructField("Gender", StringType()),
    StructField("Date_of_Birth", StringType()),
    StructField("Age", StringType()),
    StructField("Join_Date", StringType()),
    StructField("GROSS", StringType()),
    StructField("Net_Pay", StringType()),
    StructField("Deduction", StringType()),
    StructField("Deduction_percentage", FloatType()),
    StructField("Designation", StringType()),
    StructField("Department", StringType()),
    #Change the type of the column Date Of Birth
    StructField("Deduction_percentage", FloatType()),
    StructField("Tenure_in_org_in_months", IntegerType()),
    #ADD this two fields
    StructField("NumberChildren", StringType()),
    StructField("Address", StringType())

])

In [None]:
source_data = spark.read.csv("Employee.csv", header=True)

# Get the first line of the DataFrame.
first_line = source_data.head()

# Display the first line.
print(first_line)

Row(EmpID='19575', Name='Keven Norman', Gender='M', Date_of_Birth='3/9/94', Age='25', Join_Date='2/12/19', Tenure_in_org_in_months='7', GROSS='74922', Net_Pay='71494', Deduction='3428', Deduction_percentage='4.58', Designation='Product Operations Analyst.Associate.', Department='IT Product Management & Ops')


In [None]:
#print schema
source_data.printSchema()

root
 |-- EmpID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Date_of_Birth: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Join_Date: string (nullable = true)
 |-- Tenure_in_org_in_months: string (nullable = true)
 |-- GROSS: string (nullable = true)
 |-- Net_Pay: string (nullable = true)
 |-- Deduction: string (nullable = true)
 |-- Deduction_percentage: string (nullable = true)
 |-- Designation: string (nullable = true)
 |-- Department: string (nullable = true)



In [None]:
#Show data
source_data.show()

+-----+--------------+------+-------------+---+----------+-----------------------+------+-------+---------+--------------------+--------------------+--------------------+
|EmpID|          Name|Gender|Date_of_Birth|Age| Join_Date|Tenure_in_org_in_months| GROSS|Net_Pay|Deduction|Deduction_percentage|         Designation|          Department|
+-----+--------------+------+-------------+---+----------+-----------------------+------+-------+---------+--------------------+--------------------+--------------------+
|19575|  Keven Norman|     M|       3/9/94| 25|   2/12/19|                      7| 74922|  71494|     3428|                4.58|Product Operation...|IT Product Manage...|
|19944|Kristin Werner|     F|   23/06/1994| 26|13/01/2020|                      6| 44375|  39971|     4404|                9.92|Platform Operatio...| Platform Operations|
|20055|  Avery Barber|     M|   27/02/1996| 24|14/11/2019|                      8| 82263|  77705|     4558|                5.54|Platform Operatio

In [None]:
#Compare the current schema with the expected schema
if source_data.schema == expected_schema:
    print("The schema of the loaded DataFrame matches the expected schema.")
else:
    print("The schema of the loaded DataFrame does not match the expected schema..")

The schema of the loaded DataFrame does not match the expected schema..


In [None]:
current_schema = source_data.schema

In [None]:
#Add new fields to the schema if necessary
for field in expected_schema.fields:
    if field.name not in [f.name for f in current_schema.fields]:
        source_data = source_data.withColumn(field.name, lit(None).cast(field.dataType))

In [None]:
#Modify the data type of existing fields if necessary
for field in current_schema.fields:
    expected_field = next((f for f in expected_schema.fields if f.name == field.name), None)
    if expected_field and field.dataType != expected_field.dataType:
        source_data = source_data.withColumn(field.name, source_data[field.name].cast(expected_field.dataType))

In [None]:
#Write the transformed data into the target data storage
source_data.write.mode("overwrite").parquet("transformed_data")

In [None]:
transformed_data = spark.read.parquet("transformed_data")

In [None]:
print("Schéma des données transformées :")
transformed_data.printSchema()

Schéma des données transformées :
root
 |-- EmpID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Date_of_Birth: date (nullable = true)
 |-- Age: string (nullable = true)
 |-- Join_Date: string (nullable = true)
 |-- Tenure_in_org_in_months: integer (nullable = true)
 |-- GROSS: string (nullable = true)
 |-- Net_Pay: string (nullable = true)
 |-- Deduction: string (nullable = true)
 |-- Deduction_percentage: float (nullable = true)
 |-- Designation: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- NumberChildren: string (nullable = true)
 |-- Address: string (nullable = true)



In [None]:
print("Données transformées :")
transformed_data.show(truncate=False)

Données transformées :
+-----+--------------+------+-------------+---+----------+-----------------------+------+-------+---------+--------------------+---------------------------------------+----------------------------------+--------------+-------+
|EmpID|Name          |Gender|Date_of_Birth|Age|Join_Date |Tenure_in_org_in_months|GROSS |Net_Pay|Deduction|Deduction_percentage|Designation                            |Department                        |NumberChildren|Address|
+-----+--------------+------+-------------+---+----------+-----------------------+------+-------+---------+--------------------+---------------------------------------+----------------------------------+--------------+-------+
|19575|Keven Norman  |M     |NULL         |25 |2/12/19   |7                      |74922 |71494  |3428     |4.58                |Product Operations Analyst.Associate.  |IT Product Management & Ops       |NULL          |NULL   |
|19944|Kristin Werner|F     |NULL         |26 |13/01/2020|6          