# Data Transformation


## Part 1) Data cleansing

In [1]:
# Run the query onto the 'LH_Bronze' lakehouse.

df = spark.sql("SELECT * FROM LH_BRONZE.revenue")
display(df)

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 25a85bf8-5f81-4e28-83cc-24077b0a70f0)

#### Duplicate data

**Identify duplicates**

In [3]:
df \
    .groupby([ 'Branch_ID','Date_ID']) \
    .count() \
    .where('count > 1') \
    .show()

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 5, Finished, Available)

+---------+-------+-----+
|Branch_ID|Date_ID|count|
+---------+-------+-----+
|   BR0161|DT00017|    4|
|   BR0111|DT00012|    3|
+---------+-------+-----+



**dropDuplicates()** 

In [4]:
# identify duplicate data
df.count()
deduped = df.dropDuplicates()
deduped.count()

print(f"Process removed {df.count() - deduped.count()} rows from the dataset" )

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 6, Finished, Available)

Process removed 5 rows from the dataset


**Verification** 

In [5]:
deduped \
    .groupby([ 'Branch_ID','Date_ID']) \
    .count() \
    .where('count > 1') \
    .show()

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 7, Finished, Available)

+---------+-------+-----+
|Branch_ID|Date_ID|count|
+---------+-------+-----+
+---------+-------+-----+



#### Missing data/ nulls values 
**Identify missing values in a column**

In [7]:
# option 1: using isNull() 
nulls = df.filter(df.Revenue.isNull())
display(nulls)

# option 2, using .where and col
from pyspark.sql.functions import col
nulls2 = df.where(col("Revenue").isNull())
display(nulls2)



StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 18b3dd6b-045d-4c07-b52f-095cd57e01ec)

SynapseWidget(Synapse.DataFrame, 6bedd93a-13a1-4136-ad24-f601aed5fae4)

**Drop nulls values using dropna()**

In [8]:
no_nas = df.dropna(subset=['Revenue'])
print(f"Process removed {df.count() - no_nas.count()} rows from the dataset" )

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 10, Finished, Available)

Process removed 6 rows from the dataset


#### Type conversion + add new columns

In [None]:
# look @ the schema (diplay data types) 
df.printSchema()

# using cast, create a new column 
type_conv = df.withColumn('UnitsSoldConverted', df.Units_Sold.cast("string"))
type_conv.printSchema()
display(type_conv)



#### Filter data

In [12]:
from pyspark.sql.functions import col

# df.filter() 
filtered_df = df.filter(col("Revenue") > 10000000)
#filtered_df = df.where(col("Revenue") > 10000000)
print(f"Proces filtered {df.count() - filtered_df.count()} rows from the dataset" )


StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 14, Finished, Available)

Proces filtered 1109 rows from the dataset


## Part 2) Data enrichment

#### Add new columns (withColumn or withColumns)


In [13]:
# using withColumn to create new columns. 
enriched_df = df.withColumn('halfRevenue', df.Revenue/2)
display(enriched_df)

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7b6450db-97fb-4132-bd48-f5ecabd5c1ca)

#### Joining and merging

In [14]:
dealers_df = spark.sql("SELECT * FROM LH_BRONZE.dealers")
display(dealers_df)

countries_df = spark.sql("SELECT * FROM LH_BRONZE.countries")
display(countries_df)



StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5d05a960-7e04-4504-9150-2152a41c2b8c)

SynapseWidget(Synapse.DataFrame, 819e5891-9246-4361-8e3e-b39a130a082b)

In [15]:
joined_df = (
    dealers_df
        .join(countries_df, dealers_df.Country_ID == countries_df.Country_ID)
        .select(dealers_df.Dealer_ID, dealers_df.Country_ID, countries_df.Country_Name)
    )
display(joined_df)

StatementMeta(, 0e724ad0-fb74-4267-baf8-0dbc3b4ce007, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, 27217921-eec3-4274-8c45-1dd8a6499cab)