In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
# Azure Storage configurations & Mount Azure Data Lake Storage
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "Client_id",
"fs.azure.account.oauth2.client.secret": 'Secret_code',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/Talend_id/oauth2/token"}


dbutils.fs.mount(
source = "abfss://healthcare-analytics@healthcareanalyticstest.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/healthcareanalytics",
extra_configs = configs)

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-357876407847897>:8[0m
[1;32m      1[0m configs [38;5;241m=[39m {[38;5;124m"[39m[38;5;124mfs.azure.account.auth.type[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124mOAuth[39m[38;5;124m"[39m,
[1;32m      2[0m [38;5;124m"[39m[38;5;124mfs.azure.account.oauth.provider.type[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124morg.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider[39m[38;5;124m"[39m,
[1;32m      3[0m [38;5;124m"[39m[38;5;124mfs.azure.account.oauth2.client.id[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124m7f0947ce-ce58-461d-93a9-9935b6df7027[39m[38;5;124m"[39m,
[1;32m      4[0m [38;5;124m"[39m[38;5;124mfs.azure.account.oauth2.client.secret[39m[38;5;124m"[39m: [38;5;124m'[39m[38;5;124mPuZ8Q~V0bbjVX7bc~~Ub6x45TZt23c1dlkK-mcHr[39m[38;5;1

In [0]:
# Display contents of the mounted directory using DBFS command
dbutils.fs.ls("/mnt/healthcareanalytics")


Out[4]: [FileInfo(path='dbfs:/mnt/healthcareanalytics/raw-data/', name='raw-data/', size=0, modificationTime=1700463294000),
 FileInfo(path='dbfs:/mnt/healthcareanalytics/transformed-data/', name='transformed-data/', size=0, modificationTime=1700463367000)]

In [0]:
# Read data from CSV file
insurance = spark.read.format("csv").option("header","true").load("/mnt/healthcareanalytics/raw-data/insurance.csv")

In [0]:
# Display the data and its schema
insurance.show()

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|    33|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [0]:
insurance.printSchema()

root
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: string (nullable = true)
 |-- children: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: string (nullable = true)



In [0]:
# Convert columns to appropriate data types
insurance = insurance.withColumn("age",col("age").cast(IntegerType()))\
    .withColumn("bmi",col("bmi").cast(DoubleType()))\
        .withColumn("children",col("children").cast(IntegerType()))\
            .withColumn("charges",col("charges").cast(DoubleType()))

In [0]:
# Display the updated schema
insurance.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [0]:
# Export transformed data to CSV
insurance.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/healthcareanalytics/transformed-data/insurance")

In [0]:
# Check for null values
null_counts = insurance.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in insurance.columns])
print("Null Counts:")
null_counts.show()

Null Counts:
+---+---+---+--------+------+------+-------+
|age|sex|bmi|children|smoker|region|charges|
+---+---+---+--------+------+------+-------+
|  0|  0|  0|       0|     0|     0|      0|
+---+---+---+--------+------+------+-------+



In [0]:
# Check data types after transformation
expected_data_types = {
    "age": IntegerType(),
    "bmi": DoubleType(),
    "children": IntegerType(),
    "charges": DoubleType()
}

for column, expected_type in expected_data_types.items():
    actual_type = insurance.schema[column].dataType
    assert actual_type == expected_type, f"Data type mismatch for column '{column}': Expected {expected_type}, got {actual_type}"

# Check basic statistics after transformation
statistics_before = insurance.describe().toPandas().set_index("summary")
statistics_after = insurance.describe().toPandas().set_index("summary")

# Verify means after transformation
for column in expected_data_types.keys():
    before_mean = float(statistics_before.loc["mean", column])
    after_mean = float(statistics_after.loc["mean", column])
    assert before_mean == after_mean, f"Mean mismatch for column '{column}': Before {before_mean}, After {after_mean}"

# Check for null values after transformation
null_counts = insurance.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in insurance.columns])
null_counts.show()

# If the assertions pass and there are no issues reported in null_counts, the transformation logic is consistent with expectations
print("Data quality checks completed successfully.")

+---+---+---+--------+------+------+-------+
|age|sex|bmi|children|smoker|region|charges|
+---+---+---+--------+------+------+-------+
|  0|  0|  0|       0|     0|     0|      0|
+---+---+---+--------+------+------+-------+

Data quality checks completed successfully.


In [0]:
# Display summary statistics
summary_stats = insurance.select("age", "bmi", "children", "charges").summary()
summary_stats.show()

+-------+------------------+------------------+-----------------+------------------+
|summary|               age|               bmi|         children|           charges|
+-------+------------------+------------------+-----------------+------------------+
|  count|              1338|              1338|             1338|              1338|
|   mean| 39.20702541106129|30.663396860986538|  1.0949177877429|13270.422265141257|
| stddev|14.049960379216147| 6.098186911679012|1.205492739781914|12110.011236693992|
|    min|                18|             15.96|                0|         1121.8739|
|    25%|                27|             26.29|                0|         4738.2682|
|    50%|                39|              30.4|                1|         9377.9047|
|    75%|                51|              34.7|                2|       16657.71745|
|    max|                64|             53.13|                5|       63770.42801|
+-------+------------------+------------------+-----------------+

In [0]:
# Set up logging for information about data quality checks
import logging

logging.basicConfig(level=logging.INFO)
logging.info("Data quality checks completed.")

INFO:root:Data quality checks completed.
