In [19]:
from cassandra.cluster import Cluster
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

from pyspark.sql import SQLContext
import numpy as np
from pyspark.sql.functions import split, col

In [20]:
#new spark session 
spark = SparkSession.builder.appName('PPA detection').getOrCreate()

In [21]:
#connection to cassandra database and cnas keyspace
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('cnas')

In [22]:
rows = session.execute('select * from cnas   ALLOW FILTERING;')


In [23]:
dftable = pd.DataFrame(list(rows))

In [24]:
# transformation :

#remplacer None avec -1 ( aucune affection) dans la coloumn affection
dftable.affection.fillna(value=-1, inplace=True)

#remplacer None avec 0 ( aucune quantitée rejetée ) dans la coloumn qte_rejet
dftable.qte_rejet.fillna(value=0, inplace=True)


# delete rows where the quantite_med == 0
dftable.drop(dftable[dftable['quantite_med'] == 0].index, inplace = True)

#delete rejected quantity , but before this , we need to save the rows having quantity rejected > 0
#df_rejected = dftable[dftable['qte_rejet'] > 0]
#dftable.drop(dftable[dftable['qte_rejet'] > 0].index, inplace = True)


#remplacer None avec 0 ( aucune durée spécifiée ) dans la coloumn duree_traitement
dftable.duree_traitement.fillna(value=0, inplace=True)

#change the type of some lines
dftable = dftable.astype({"affection": str})
dftable = dftable.astype({"fk": float})
dftable = dftable.astype({"age": int})


In [25]:
# garder les coloumns qu'on est besoin 
dftable=dftable[['id','fk','codeps','affection','age','applic_tarif','date_paiement','num_enr','sexe','ts','quantite_med','qte_rejet']]
# print the columns that we need : dftable.info()

In [26]:
# split the table into two table : rejected one and accepted one
rejected = dftable[dftable['qte_rejet'] > 0]
accepted = dftable[dftable['qte_rejet'] == 0]

In [27]:
rejected

Unnamed: 0,id,fk,codeps,affection,age,applic_tarif,date_paiement,num_enr,sexe,ts,quantite_med,qte_rejet
711,119493.0,6.601690e+32,4501000283,-1,56,O,2022-01-26,01294,M,N,1.0,1.0
883,113252.0,3.801150e+32,4501000283,8007,84,O,2022-01-26,04419,M,N,1.0,1.0
918,113060.0,7.810300e+32,4501000283,-1,44,O,2022-01-26,05406,F,N,1.0,1.0
1245,110861.0,2.837040e+32,4501000283,80,94,O,2022-01-20,05849,F,N,3.0,3.0
1311,110155.0,4.506930e+32,4501000283,-1,77,O,2022-01-20,02638,F,N,1.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...
136157,4462.0,4.211230e+32,4501303568,191780,74,N,2022-01-11,01393,F,N,2.0,1.0
136190,3847.0,4.109960e+32,4501303568,8017,51,O,2022-01-11,05628,F,N,3.0,1.0
136226,2896.0,3.913080e+32,4501303568,8082,79,N,2022-01-11,06812,F,N,3.0,2.0
136345,49.0,1.525640e+32,4501303568,-1,28,N,2022-01-11,06513,F,N,1.0,1.0


In [28]:
sparkdf = spark.createDataFrame(accepted)
rejected_sparkdf = spark.createDataFrame(rejected)

In [30]:
#transform the affection column to array of int ( splited by ',')
sparkdf = sparkdf.withColumn("affection", split(col("affection"), ",").cast("array<int>"))

In [31]:
#sort the affection array 
import pyspark.sql.functions as F
sparkdf = sparkdf.withColumn('affection', F.array_sort('affection'))

In [32]:
## put the age in ranges
from pyspark.sql.functions import udf
@udf("String")
def age_range(age):
    if age >= 0 and age <= 5:
        return '0-5'
    elif age > 5 and age <= 10:
        return '6-10'
    elif age > 10 and age <= 16:
        return '11-16' 
    elif age > 16 and age <= 24:
        return '17-24' 
    elif age > 24 and age <= 60:
        return '25-60' 
    elif age > 60 and age <= 76:
        return '61-76' 
    else:
        return '75+'
    


sparkdf = sparkdf.withColumn("age", age_range(col("age")))

In [33]:
# transform the affection column to a string again ( so we can index it)
from pyspark.sql.functions import col, concat_ws
sparkdf = sparkdf.withColumn("affection",
   concat_ws(",",col("affection")))

In [34]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer
indexer=StringIndexer(inputCols=["sexe","applic_tarif","ts","affection","age"],outputCols=["sex_indexed","applic_tarif_indexed",
                                                                         "ts_indexes","affection_indexes","age_indexes"])
df_r=indexer.setHandleInvalid("keep").fit(sparkdf).transform(sparkdf)


22/06/21 01:49:04 WARN TaskSetManager: Stage 0 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [35]:
# call the model and fit it 
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['id','fk','age_indexes','sex_indexed','affection_indexes',
                          'ts_indexes','quantite_med',],outputCol="Independent Features")
output=featureassembler.transform(df_r)

In [36]:
#prepare the data to fit it to the model 
finalized_data=output.select("Independent Features","quantite_med")

In [37]:
# call and fit the model 
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='quantite_med', maxIter=10, regParam=0.3, elasticNetParam=0.8)
regressor=regressor.fit(train_data)

22/06/21 01:53:35 WARN TaskSetManager: Stage 3 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.
22/06/21 01:53:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/06/21 01:53:36 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/06/21 01:53:37 WARN TaskSetManager: Stage 5 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [38]:
print("Coefficients: %s" % str(regressor.coefficients))
print("Intercept: %s" % str(regressor.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.7939201046267776]
Intercept: 0.3441251379654925


In [39]:
### Predictions
pred_results=regressor.evaluate(test_data)
## Final comparison
pred_results.predictions.show()

22/06/21 01:55:13 WARN TaskSetManager: Stage 7 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.
22/06/21 01:55:14 WARN TaskSetManager: Stage 9 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+------------+------------------+
|Independent Features|quantite_med|        prediction|
+--------------------+------------+------------------+
|(7,[0,1,6],[1.0,1...|         1.0|1.1380452425922702|
|(7,[0,1,6],[102.0...|         1.0|1.1380452425922702|
|(7,[0,1,6],[536.0...|         2.0|1.9319653472190477|
|(7,[0,1,6],[1187....|         1.0|1.1380452425922702|
|(7,[0,1,6],[1245....|         3.0|2.7258854518458255|
|(7,[0,1,6],[1918....|         1.0|1.1380452425922702|
|(7,[0,1,6],[2110....|         1.0|1.1380452425922702|
|(7,[0,1,6],[2130....|         2.0|1.9319653472190477|
|(7,[0,1,6],[2131....|         1.0|1.1380452425922702|
|(7,[0,1,6],[2132....|         2.0|1.9319653472190477|
|(7,[0,1,6],[2271....|         3.0|2.7258854518458255|
|(7,[0,1,6],[2275....|         2.0|1.9319653472190477|
|(7,[0,1,6],[2459....|         1.0|1.1380452425922702|
|(7,[0,1,6],[2511....|         3.0|2.7258854518458255|
|(7,[0,1,6],[2514....|         1.0|1.1380452425922702|
|(7,[0,1,6

In [40]:
### Performance Metrics
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.9575305154100787, 0.18358133832686163, 0.08416800767901612)

In [41]:
Final_result = pred_results.predictions.where("quantite_med > prediction ")

from pyspark.sql.functions import round, col
Final_result = Final_result.select("Independent Features"  , "quantite_med", round(col('prediction')))
Final_result.show(50)

+--------------------+------------+--------------------+
|Independent Features|quantite_med|round(prediction, 0)|
+--------------------+------------+--------------------+
|(7,[0,1,6],[536.0...|         2.0|                 2.0|
|(7,[0,1,6],[1245....|         3.0|                 3.0|
|(7,[0,1,6],[2130....|         2.0|                 2.0|
|(7,[0,1,6],[2132....|         2.0|                 2.0|
|(7,[0,1,6],[2271....|         3.0|                 3.0|
|(7,[0,1,6],[2275....|         2.0|                 2.0|
|(7,[0,1,6],[2511....|         3.0|                 3.0|
|(7,[0,1,6],[3197....|         2.0|                 2.0|
|(7,[0,1,6],[5965....|         2.0|                 2.0|
|(7,[0,1,6],[6951....|         2.0|                 2.0|
|(7,[0,1,6],[7957....|         2.0|                 2.0|
|(7,[0,1,6],[8095....|         3.0|                 3.0|
|(7,[0,1,6],[8149....|         4.0|                 4.0|
|(7,[0,1,6],[8642....|         2.0|                 2.0|
|(7,[0,1,6],[8842....|         

22/06/21 01:57:32 WARN TaskSetManager: Stage 10 contains a task of very large size (1342 KiB). The maximum recommended task size is 1000 KiB.
