In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error
import numpy as np
import time
import pandas as pd
import random, math, csv


In [2]:
import logging
logger = logging.getLogger(__name__)

In [3]:
# Stream de kinesis
import boto3
import json
from botocore.exceptions import ClientError
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
stream_name = 'medical-data' 

In [4]:
# Initializer un session Spark
spark = SparkSession.builder.appName("App").getOrCreate()

In [5]:
# Fonction pour générer des dossiers patients
def generate_patient_added_event():
    event = {
            "age": random.randint(18, 65),
            "sex": random.choice(['female', 'male']),
            "bmi": round(random.uniform(15, 54), 2),
            "children": random.randint(0, 5),
            "smoker": random.choice(['no', 'yes']),
            "region": random.choice(['northwest', 'northeast', 'southwest', 'southeast']),
    }
    return event

In [6]:
# Données générées
data = [generate_patient_added_event() for _ in range(100)]

csv_file = 'generated_data.csv'

with open(csv_file, 'w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=data[0].keys())
    writer.writeheader()
    writer.writerows(data)

print(f"Data written to {csv_file}")

Data written to generated_data.csv


In [7]:
# Fonction pour mettre les dossiers patients sur le stream de Kinesis
def put_record(data, partition_key):
    try:
        response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key
        )
        logger.info("Put record in stream %s.", stream_name)
        print(f"Data {data} published into stream {stream_name} successfully.")
    except ClientError:
        logger.exception("Couldn't put record in stream %s.", stream_name)
        raise
    else:
        return response

In [8]:
# Décommentez pour démarrer le flux
"""if __name__ == '__main__':
    try:
        while True:  
            event_data = generate_patient_added_event()
            partition_key = f"PK{random.randint(1, 7)}"

            put_record(event_data, partition_key)

            time.sleep(5)
    except KeyboardInterrupt:
        print("\nScript stopped by manual intervention!")"""

'if __name__ == \'__main__\':\n    try:\n        while True:  \n            event_data = generate_product_added_event()\n            partition_key = f"PK{random.randint(1, 7)}"\n\n            put_record(event_data, partition_key)\n\n            time.sleep(5)\n    except KeyboardInterrupt:\n        print("\nScript stopped by manual intervention!")'

In [9]:
# CSV à Parquet conversion pour Spark
dfp = pd.DataFrame(data)
df = pd.read_csv('generated_data.csv')

df.to_parquet('generated_data.parquet')

In [10]:
file_path_one = './generated_data.parquet'
gendata = spark.read.parquet(file_path_one)
gendata.show(5)
gendata

+---+------+-----+--------+------+---------+
|age|   sex|  bmi|children|smoker|   region|
+---+------+-----+--------+------+---------+
| 41|female|38.36|       3|    no|southeast|
| 38|  male| 34.9|       2|    no|southwest|
| 38|female|47.72|       2|   yes|northwest|
| 23|female|17.82|       4|    no|southeast|
| 20|female|35.53|       3|   yes|northeast|
+---+------+-----+--------+------+---------+
only showing top 5 rows



DataFrame[age: bigint, sex: string, bmi: double, children: bigint, smoker: string, region: string]

In [11]:
#Le type de chaque attribue
event_data = generate_patient_added_event()
event_data
for key, value in event_data.items():
    print(f"Key: {key}, Type: {type(value)}")

Key: age, Type: <class 'int'>
Key: sex, Type: <class 'str'>
Key: bmi, Type: <class 'float'>
Key: children, Type: <class 'int'>
Key: smoker, Type: <class 'str'>
Key: region, Type: <class 'str'>


In [12]:
file_path = './insurance.parquet'
insurance_df = spark.read.parquet(file_path)
insurance_df.show(5)
insurance_df

+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
+---+------+------+--------+------+---------+-----------+
only showing top 5 rows



DataFrame[age: bigint, sex: string, bmi: double, children: bigint, smoker: string, region: string, charges: double]

In [13]:
# Diviser les données en deux pour l'apprentissage automatique
train_df, test_df = insurance_df.randomSplit([.8, .2], seed=42)

In [20]:
# Les opérations pour convertir les données catégoriques afin de les inclure dans l'apprentissage automatique
categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]

index_output_cols = [x + "Index" for x in categorical_cols]

ohe_output_cols = [x + "OHE" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double" or dataType == "bigint" ) & (field != "charges"))]

assembler_inputs = numeric_cols + ohe_output_cols

vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [22]:
# Les opérations d'apprentissage automatique
lr = LinearRegression(labelCol="charges", featuresCol="features", predictionCol="prediction_lr")

rfr = RandomForestRegressor(labelCol="charges", featuresCol="features", predictionCol="prediction_rfr")

gbt = GBTRegressor(labelCol="charges", featuresCol="features", predictionCol="prediction_gbt")

pipeline = Pipeline(stages = [string_indexer, ohe_encoder, vec_assembler, lr, rfr, gbt])

pipeline_model = pipeline.fit(train_df)

pred_df = pipeline_model.transform(test_df)

pred_df.select("features", "charges", "prediction_lr", "prediction_rfr", "prediction_gbt").show(20)
type(pred_df)


+--------------------+-----------+------------------+------------------+------------------+
|            features|    charges|     prediction_lr|    prediction_rfr|    prediction_gbt|
+--------------------+-----------+------------------+------------------+------------------+
|[18.0,24.09,1.0,1...|  2201.0971| 308.0785935124295| 5563.733118618825| 7644.366389283847|
|[18.0,27.28,3.0,1...| 18223.4512|26033.038976080512|19734.727382680612|18955.511355551087|
|[18.0,29.165,0.0,...|7323.734819|2741.4005389676386| 4390.787045228075|3226.3080245374485|
|[18.0,31.35,0.0,1...|  1622.1885|2385.8281368435964| 5392.017914279037|3237.0723283114107|
|[18.0,35.625,0.0,...| 2211.13075| 5058.635876565417| 5925.249686865232| 4697.659051744988|
|[18.0,38.17,0.0,1...|  1631.6683| 4832.197332264159|  5505.72593005443|4037.9878465258766|
|[18.0,40.185,0.0,...| 2217.46915| 6694.331408987377|5305.8546573876765| 4558.879697741631|
|(8,[0,1,4,7],[18....|  1702.4553|-248.3999833738526| 4190.558226857154| 2058.54

pyspark.sql.dataframe.DataFrame

In [24]:
lr_model = pipeline_model.stages[-3]
lcr = []

for i in range(len(lr_model.coefficients)):
    lcr.append(round(lr_model.coefficients[i], 2))
lr_i = round(lr_model.intercept, 2)

print("For linear regression:\n")
print(f"charges = {lcr[0]}*age + {lcr[1]}*bmi + {lcr[2]}*children + {lcr[3]}*sex + {lcr[4]}*smoker + {lcr[5]}*regionOHE1 + {lcr[6]}*regionOHE2 + {lcr[7]}*regionOHE3 + {lr_i}")

For linear regression:

charges = 252.24*age + 358.71*bmi + 526.45*children + 229.56*sex + -23527.79*smoker + -278.62*regionOHE1 + 506.88*regionOHE2 + 860.72*regionOHE3 + 10176.96


In [27]:
#Les résultats pour mean squared error
mse_lr = mean_squared_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_lr").collect()))
mse_rfr = mean_squared_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_rfr").collect()))
mse_gbt = mean_squared_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_gbt").collect()))
print(math.sqrt(mse_lr))
print(math.sqrt(mse_rfr))
print(math.sqrt(mse_gbt))


5660.558250981996
4479.665155130369
5077.150858402546


In [28]:
#Les résultats pour mean absolute error
mae_lr = mean_absolute_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_lr").collect()))
mae_rfr = mean_absolute_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_rfr").collect()))
mae_gbt = mean_absolute_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_gbt").collect()))
print(mae_lr)
print(mae_rfr)
print(mae_gbt)

3960.005525917079
2909.6562143155616
2864.958810031765


In [29]:
#Les résultats pour mean absolute percentage error
mape_lr = mean_absolute_percentage_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_lr").collect()))
mape_rfr = mean_absolute_percentage_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_rfr").collect()))
mape_gbt = mean_absolute_percentage_error(np.array(pred_df.select("charges").collect()), np.array(pred_df.select("prediction_gbt").collect()))
print(mape_lr)
print(mape_rfr)
print(mape_gbt)

0.3818657533655208
0.4342880909660014
0.3065796460605302


In [30]:
pred_df.toPandas().to_csv('prediction_insurance.csv')