In [0]:
spark

#### Import niezbędnych bibliotek.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### Załadowanie danych

In [0]:
df = spark.read.csv('/FileStore/tables/dyp/dane_end3.csv',inferSchema=True,header=True,sep=";")

In [0]:
df.printSchema()

In [0]:
df.display()

In [0]:
print("Liczba rekordów do analizy: ",df.count())

#### Sprawdzenie i usunięcie duplikatów wg. pola `PESEL_ANO`

Ponieważ określona osoba może być zatrudniania wielokrotnie a z wywiadu z pracownikami Firmy wynika, że w takich przypadkach w większości są to umowy na okres krótszy niż 3 miesiące i zawierane w celu czasowego zapotrzebowania na pracowników (sezonowość), to zdecydowano o usunięciu tego typu rekordów z dalszej analizy.

In [0]:
df_duplikat = df.join(
    df.groupBy('PESEL_ANO').agg((count("*")>1).cast("int").alias("Duplikat")),
    on='PESEL_ANO',
    how="inner"
)
df_nodup = df_duplikat.filter(df_duplikat['Duplikat'] == 0)
print("Liczba rekordów po usunięciu duplikatów: ",df_nodup.count())
df = df_nodup

In [0]:
df_duplikat.select('PESEL_ANO', 'Duplikat').show()

#### Wybór zmiennych do analizy na podstawie wiedzy eksperckiej

Część z dostarczonych przez Firmę danych nie wnosi wartości dodanej do dalszej analizy i z tego względu dane te zostaną usuniete z df.

In [0]:
df.columns

In [0]:
df = df.select('WIEK', 'WYK', 'PLEC', 'WOJE', 'UMOCZAS', 'JEDN', 'GRUZAW', 'PGRUZAW', 'ZATRODZ', 'ETAT' ,'WYNAGR','M3')
df.limit(5).toPandas()

In [0]:
df.summary().display()

#### Sprawdzenie wartości NaN i Null

In [0]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show(vertical=True)

In [0]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show(vertical=True)

Wartości `null` wystąpiły w zmiennych "WOJE" i "UMOCZAS". Ze względu na brak możliwości uzupełnienia ww. braków innymi wartościami w sposób zapewniający logiczną spójność z pozostałymi wartościami zmiennej, zdecydowano o usunięciu rekordów z brakami z dalszej analizy.

In [0]:
df_nonull = df.filter(df['WOJE'].isNotNull())
print("Liczba rekordów po usunięciu wrtości 'null': ",df_nonull.count())

In [0]:
df_nonull.select([count(when(col(c).isNull(), c)).alias(c) for c in df_nonull.columns]).display()

In [0]:
print("Liczba rekordów po usunięciu wrtości 'null': ",df_nonull.count())

In [0]:
df = df_nonull

#### Elementy EDA

In [0]:
df.groupBy('WOJE').count().orderBy('count', ascending=False).display()

W badanym okresie najliczniejszymi, zatrudnianymi grupami pracowników były osoby zamieszkujące województwa: mazowieckie, śląskie, wielkopolskie i dolnośląskie.

In [0]:
spark.read.csv('/FileStore/tables/dyp/dane_end3.csv',inferSchema=True,header=True,sep=";").select('MZ_ANO')\
.groupBy('MZ_ANO').count().orderBy('count', ascending=False).limit(30).display()
#df.groupBy('MZ_ANO').count().orderBy('count', ascending=False).limit(30).display()

Najwięcej pracowników pochodziło z miejscowości m9137.

In [0]:
import seaborn as sns

In [0]:
df1 = spark.read.csv('/FileStore/tables/dyp/dane_end3.csv',inferSchema=True,header=True,sep=";")
df1 = df1.drop('PESEL_ANO')
df_pandas = df1.toPandas()

In [0]:
df1.select('WIEK').display()

In [0]:
df_pandas.boxplot('WIEK')

In [0]:
df1.select('WIEK').describe().show()

In [0]:
sns.set_theme(style="whitegrid")
sns.boxplot(x="WIEK", data=df_pandas)

Średnia wieku pracowników zatrudnianych w badanym okresie wynosiła 35 lat, przy czym najmłodsza osoba miała 18 lat a najstarsza 77. Z uwagi na sędziwy wiek najstarszego pracownika, zasadna jest weryfikacja, czy nie jest to wartość błędna. Potwierdza to również wykres pudelkowy, na którym widać wartości odstające.

In [0]:
df.select('WIEK').filter(df['WIEK']>65).count()

In [0]:
sns.set_theme(style="whitegrid")
sns.boxplot(x="WIEK", y="PLEC", data=df_pandas)

In [0]:
#df1.select('GRUZAW').agg(min('GRUZAW'),max('GRUZAW')).show()
df1.select('GRUZAW').groupBy('GRUZAW').count().count()

In [0]:
df1.select('M3','GRUZAW').filter(df['M3'] == 1).groupBy('GRUZAW').count().orderBy('count',ascending=False).display()

Spośród osób, które pracowały krócej niż 3 miesiące (M3=1) najwięcej było osób z grup zawodowych o numerach 14, 8 i 13.

In [0]:
df_pandas.columns

In [0]:
df1.select('PLEC').groupBy('PLEC').count().display()

W badanym okresie zatrudnianych było więcej kobiet niż mężczyzn.

In [0]:
df1.select('M3','PLEC').display()

Wśród grupy osób, które przepracowały więcej niż 3 miesiące (M3=0), większość stanowiły kobiety. Wśród grupy osób, które przepracowały mniej niż 3 miesiące (M3=1), większość stanowili mężczyźni.

In [0]:
df1.select('M3', 'WYNAGR').display()

Wśród grupy osób, które przepracowały mniej niż 3 miesiące (M3=1), większość było osób z wynagrodzeniem z zakresu od Grade01 do Grade09 a najwiecej z Grade05.

#### Kategoryzacja zmiennych

Algorytmy biblioteki MLlib wymagaja określonej struktury danych wsadowych do przetwarzania. W związku z tym zmienne objaśniające typu dyskretnego (opisowe) należy odpowiednio przetworzyć i zmienić na wartości numeryczne. Zmienne typu ciagłego na tym etapie pozostaja bez zmian.

In [0]:
df.printSchema()

In [0]:
df_ml = df 

In [0]:
# mimo, że zmienne 'GRUZAW' i 'PGRUZAW' maja postać numeryczną i wyglądają na ciągłe, to tak naprawdę sa zmiennymi dykretnymi, gdzie numery sa nazwami klas przyporządkowania

In [0]:
df_ml = df_ml.withColumn('GRUZAW', df_ml['GRUZAW'].cast(StringType()))
df_ml = df_ml.withColumn('PGRUZAW', df_ml['PGRUZAW'].cast(StringType()))

In [0]:
df_ml.printSchema()

Zmienne objaśniające ostatecznie przybierają postać wektora nazywanego `features`  a zmienna objaśnia to wektor `label`.

In [0]:
# input_columns -  lista kolumn do utworzenia zmiennej `features`
drop_col = 'M3' 
input_columns = df_ml.select([cols for cols in df_ml.columns if cols not in drop_col]).columns
input_columns

In [0]:
# indexed - robocza DataFrame, w której utworzone zostaną zmienne `features` i `label`

indexed = df_ml.withColumn('label',df_ml['M3'])

In [0]:
indexed.printSchema()

In [0]:
print(input_columns)

Zamiana typu z opisowego na numeryczny dla zmiennych kategorialnych.

In [0]:
numeric_inputs = []
string_inputs = []
for column in input_columns:
    if str(indexed.schema[column].dataType) == 'StringType':
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num")
        encoder = OneHotEncoder(inputCol=column+"_num", outputCol=column+"_vec")
        indexed = indexer.fit(indexed).transform(indexed)
        indexed = encoder.fit(indexed).transform(indexed)
        new_col_name = column+"_vec" 
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)

In [0]:
print(numeric_inputs)
print(string_inputs)

##### Skośność i redukcja wartości odstających 

0.98999

In [0]:
d = {}
for col in numeric_inputs:
    d[col] = indexed.approxQuantile(col,[0.015,0.985],0.01)
d

In [0]:
# for col in numeric_inputs:
#     skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
#     print(skew[0][0])

In [0]:
numeric_inputs_skew = []
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect()
    skew = skew[0][0]
    print(f"Skośność zmiennej {col} wynosi: {skew}")
    if skew > 0.5:
        indexed = indexed.withColumn(col+'_skew', \
        log(when(indexed[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        new_col_name = col+"_skew" 
        numeric_inputs_skew.append(new_col_name)
        print(f"Zmienna '{col}'' została uznana jako prawoskośna i dokonano jej transformacji poprzez funkcję 'log()'.")
    elif skew < -0.5:
        indexed = indexed.withColumn(col+'_skew', \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        new_col_name = col+"_skew" 
        numeric_inputs_skew.append(new_col_name)
        print(f"Zmienna '{col}' została uznana jak0 lewoskośna i dokonano jej transformacji poprzez funkcję 'exp()'.")
    else:
        print(f"Dla zmiennej '{col}' skośność jest w normie.")

In [0]:
indexed.select('WIEK','WIEK_skew').show()

In [0]:
print(numeric_inputs_skew)
print(numeric_inputs)

#### Wektoryzacja zmiennych

In [0]:
features_list = numeric_inputs_skew + string_inputs
print(features_list)

In [0]:
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
output = assembler.transform(indexed)
final_data = output.select('features','label')

In [0]:
final_data.show(5,truncate=False)

##### Badanie równowagi klas zmiennej objaśnianej (`label`)

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplitModel

In [0]:
lr = LogisticRegression()
grid_tvs = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator_tvs = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid_tvs, evaluator=evaluator_tvs, parallelism=1)

tvsModel = tvs.fit(final_data)

tvs_ratio = tvsModel.getTrainRatio()
print(f"Współczynnik podziału train-test: {tvs_ratio}")

In [0]:
final_data.groupBy('label').count().show()

#### Zapisanie final_data do pliku `parquet`
Plik final_data można zachować jako etap prac i przy kolejnym otwarciu notebooka wczytać juz dane przygotowane do ML.

In [0]:
#final_data.write.mode('overwrite').parquet('/FileStore/tables/dyp/final_data/')

#### Odczyt danych z pliku `parquet`

In [0]:
#final_data = spark.read.parquet('/FileStore/tables/dyp/final_data/') # wczytanie danych do ML

In [0]:
# final_data.count()

In [0]:
# final_data.show()

#### Podział na zbiory trenujący i testowy
Zbiór trenujący to zestaw losowo wybranych obserwacji, który służy do trenowania i budowy modelu danych wg. określonego algorytmu. Zbiór testowy to pozostale dane ze zbioru głównego , które służą do testowania i ewaluacji zbudowanego wcześniej modelu danych.

In [0]:
train, test = final_data.randomSplit([tvs_ratio,1-tvs_ratio])
print("Liczba obserwacji w zbiorze trenujacym: ",train.count())
print("Liczba obserwacji w zbiorze testowym: ",test.count())

#### Regresja logistyczna (bez "cross-validation")

In [0]:
# przygotowanie obiektu do ewaluacji (oceny) modelu. W przypadku klasyfikacji binarnej wykorzystywana bedzie miara AUC.
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')

In [0]:
# budowa i trenowanie modelu danych przy standardowych ustawieniach parametrów
classifier = LogisticRegression()
fitModel = classifier.fit(train)

In [0]:
# detekcja najlepszego punktu odcięcia tj. granicznej wartości prawdopodobiństwa oddzielającej klasy 0 i 1, przy wykorzystanu F-score
trainingSummary = fitModel.summary

fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']

print("Najlepszy punkt odcięcia (Threshold): ",bestThreshold)

In [0]:
classifier = LogisticRegression(threshold=bestThreshold)
fitModel = classifier.fit(train)

In [0]:
# testowanie i ewaluacja modelu danych
prediction_lr = fitModel.transform(test)
auc_lr = Bin_evaluator.evaluate(prediction_lr)
print("Test AUROC: ", auc_lr)

In [0]:
prediction_lr.show()

In [0]:
trainSummary = fitModel.summary

# Obtain the objective per iteration
objectiveHistory = trainSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainSummary.roc.display()
print()
print("Train AUROC: " + str(trainingSummary.areaUnderROC))

#### Regresja logistyczna (z "cross-validation")

In [0]:
classifier_cv = LogisticRegression(threshold=bestThreshold)

paramGrid = (ParamGridBuilder().addGrid(classifier_cv.maxIter, [15,20]).build())

crossval = CrossValidator(estimator=classifier_cv,
                          estimatorParamMaps=paramGrid,
                          evaluator=Bin_evaluator,
                          numFolds=3)

fitModel_cv = crossval.fit(train)

BestModel = fitModel_cv.bestModel

LR_BestModel = BestModel

predictions_cv = fitModel_cv.transform(test)

auc_cv = Bin_evaluator.evaluate(predictions_cv)
print("Test AUC dla LogRef z cross-val: ", auc_cv)

In [0]:
# ekstrakcja współczynników
coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = []
for x in coeff_array[0]:
    coeff_scores.append(float(x))

data_schema = [StructField("feature", StringType(), True),StructField("coeff", DoubleType(), True)]
final_struc = StructType(fields=data_schema)
result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=final_struc)

result.orderBy('coeff', ascending=False).show(truncate=False)

#### Drzewa decyzyjne i Lasy losowe (JP)

In [0]:
dtc = DecisionTreeClassifier() # klasyfikator dla drzewa decyzyjnego
rfc = RandomForestClassifier(numTrees=100) # klasyfikator dla lasu losowego

dtc_model = dtc.fit(train) # budowa i trenowanie modelu drzewa decyzyjnego 
rfc_model = rfc.fit(train) # budowa i trenowanie modelu lasu losowego

dtc_preds = dtc_model.transform(test) # prognozowanie wartości zmiennej objaśnianej przez model drzewa decyzyjnego
rfc_preds = rfc_model.transform(test) # prognozowanie wartości zmiennej objaśnianej przez model lasu losowego

# ocena predykcji przez model drzewa decyzyjnego
auc_dtc = Bin_evaluator.evaluate(dtc_preds)
print("Decision Tree AUC: ", auc_dtc)

# ocena predykcji prze model lasu losowego
auc_rfc = Bin_evaluator.evaluate(rfc_preds)
print("Random Rorest AUC: ", auc_rfc)