####Big Data Analysis  
Drugi deo projekta u kome se zahteva definisanje i analiza hipoteza  
Bići obrađeni transformisani podaci o uticaju stresa na čoveka

Učitavanje transformisanih podataka u Notebook

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
all_data_df_path = 'dbfs:/stresTransformacije/df.csv'

all_data_df = spark.read.csv(all_data_df_path, header=True, inferSchema=True)

In [0]:
all_data_df.cache()
all_data_df.printSchema()
all_data_df.show()

root
 |-- hrkanje: double (nullable = true)
 |-- brzina_disanja: double (nullable = true)
 |-- temperatura_tela: double (nullable = true)
 |-- kiseonik_u_krvi: double (nullable = true)
 |-- dužina_sna: double (nullable = true)
 |-- puls: double (nullable = true)
 |-- stres: integer (nullable = true)
 |-- vlažnost: double (nullable = true)
 |-- broj_koraka: integer (nullable = true)
 |-- nivo_stresa: integer (nullable = true)

+-------+--------------+----------------+---------------+----------+-----+-----+--------+-----------+-----------+
|hrkanje|brzina_disanja|temperatura_tela|kiseonik_u_krvi|dužina_sna| puls|stres|vlažnost|broj_koraka|nivo_stresa|
+-------+--------------+----------------+---------------+----------+-----+-----+--------+-----------+-----------+
|   60.0|          20.0|            96.0|           95.0|       7.0| 60.0|    1|   15.97|         51|          1|
|  85.76|        23.536|          90.768|         88.768|     0.768|68.84|    3|   15.97|         51|          1|


####Vizualizacija data set-a

In [0]:
display(all_data_df)

hrkanje,brzina_disanja,temperatura_tela,kiseonik_u_krvi,dužina_sna,puls,stres,vlažnost,broj_koraka,nivo_stresa
60.0,20.0,96.0,95.0,7.0,60.0,1,15.97,51,1
85.76,23.536,90.768,88.768,0.768,68.84,3,15.97,51,1
56.88,19.376,95.376,94.064,6.376,58.44,1,15.97,51,1
55.52,19.104,95.104,93.656,6.104,57.76,1,15.97,51,1
73.44,21.344,93.344,91.344,4.016,63.36,2,15.97,51,1
59.28,19.856,95.856,94.784,6.856,59.64,1,15.97,51,1
87.8,24.08,91.04,89.04,1.04,70.2,3,15.97,51,1
52.32,18.464,94.464,92.696,5.464,56.16,1,15.97,51,1
52.64,18.528,94.528,92.792,5.528,56.32,1,15.97,51,1
86.24,23.664,90.832,88.832,0.832,69.16,3,15.97,51,1


Broj osoba sa stresom grupisan po nivoima stresa

In [0]:
display(all_data_df.select('stres').groupBy('stres').count().orderBy('count', ascending=False))

stres,count
2,162640
1,161350
3,121335


Output can only be rendered in Databricks

Broj koraka grupisan po nivou stresa

In [0]:
display(all_data_df.select('stres', 'broj_koraka').where(col('stres') == 2)
       .groupBy('stres', 'broj_koraka')
       .count().orderBy('count', ascending=False))

stres,broj_koraka,count
2,125,1386
2,127,1386
2,126,1386
2,124,1386
2,123,1386
2,128,1386
2,122,1386
2,183,1260
2,192,1260
2,107,1260


Output can only be rendered in Databricks

Broj stresa grupisan u odnosu na dužinu spavanja

In [0]:
display(all_data_df.select('stres').where(col('dužina_sna') < 4)
       .groupBy('stres')
       .count().orderBy('count', ascending=False))

stres,count
3,121335
2,108427


Output can only be rendered in Databricks

Količina stresa u momentu kada se ubrza disanje

In [0]:
display(all_data_df.select('stres', 'brzina_disanja')
       .where(col('brzina_disanja') > 0)
       .groupBy('stres').count().orderBy('count'), ascending=False)

stres,count
3,121335
1,161350
2,162640


Output can only be rendered in Databricks

Količina vlažnosti grupisana po nivou stresa

In [0]:
display(all_data_df.select('stres', 'vlažnost')
       .where(col('vlažnost') > 20)
       .groupBy('stres').count().orderBy('count'), ascending=False)

stres,count
3,93981
1,124975
2,125974


Output can only be rendered in Databricks

Uticaj glasnoće hrkanja na dužinu spavanja

In [0]:

display(all_data_df.groupBy("dužina_sna").agg({"hrkanje": "avg"}))

dužina_sna,avg(hrkanje)
4.592,77.28000000000011
5.48,52.3999999999998
0.032,80.24000000000004
6.936,59.68000000000017
5.608,53.04000000000026
4.88,79.2000000000003
6.44,57.20000000000018
0.0,80.0
3.368,69.12000000000032
3.776,71.84000000000012


Output can only be rendered in Databricks

Količina kiseonika grupisana po pulsu

In [0]:
display(all_data_df.select('kiseonik_u_krvi', 'puls')
       .where(col('puls') > 50)
       .groupBy('kiseonik_u_krvi').count().orderBy('count'), ascending=False)

kiseonik_u_krvi,count
93.56,1290
91.92,1290
90.32,1290
89.36,1290
88.56,1290
88.0,1290
88.32,1290
91.68,1290
88.8,1290
92.36,1290


Output can only be rendered in Databricks

Temperatura tela grupisana po vlažnosti

In [0]:
display(all_data_df.select('temperatura_tela')
        .where(col('vlažnost') > 0)
        .groupBy('temperatura_tela')
        .count()
        .orderBy('count', ascending=False))

temperatura_tela,count
94.704,1291
90.336,1291
93.424,1291
93.216,1291
90.864,1291
95.856,1291
93.792,1291
92.256,1291
95.808,1291
95.328,1291


Output can only be rendered in Databricks

Broj koraka grupisan po vlažnosti

In [0]:
display(all_data_df.select('broj_koraka', 'vlažnost')
        .where(col('vlažnost') > 20)
        .groupBy('broj_koraka')
        .count()
        .orderBy('count', ascending=False))

broj_koraka,count
137,3450
193,3450
192,3450
185,3450
169,3450
161,3450
162,3450
130,3450
194,3450
145,3450


Output can only be rendered in Databricks

Količina kiseonika u krvi grupisana po stresu

In [0]:
display(all_data_df.select('stres', 'kiseonik_u_krvi')
        .where(col('kiseonik_u_krvi') > 0)
        .groupBy('stres')
        .count()
        .orderBy('count', ascending=False))

stres,count
2,162640
1,161350
3,121335


Output can only be rendered in Databricks

##Analiza hipoteza

####Hipoteza 1  
Odnos stresa na dužinu sna.

In [0]:
display(all_data_df.select('stres')
       .where(col('dužina_sna') < 7)
       .groupBy('stres')
       .count()
       .orderBy('count', ascending=False))

stres,count
2,162640
1,160060
3,121335


Output can only be rendered in Databricks

Pored dužine sna potrebno je da vidimo kako stres utiče na puls i nivo kiseonika u krvi.

In [0]:
broj_stresiranih = (all_data_df.select('stres')
    .where(col('stres') > 0)
    .count())
print('Ukupan broj osoba sa stresom: ', broj_stresiranih)

broj_pulsa_kiseonika = (all_data_df.select('stres')
                 .where(col('puls') > 0)
                 .where(col('kiseonik_u_krvi') > 0)
                 .where(col('stres') == 3)
                 .count())

print('Ukupan broj osoba sa određenim pulsom i kiseonikom kod kojih je stres visok: ', broj_pulsa_kiseonika)

Ukupan broj osoba sa stresom:  445325
Ukupan broj osoba sa određenim pulsom i kiseonikom kod kojih je stres visok:  121335


In [0]:
display(all_data_df.select('dužina_sna')
       .where(col('stres') == 3)
       .groupBy('dužina_sna')
       .count()
       .orderBy('count', ascending=False))

print('Prosek osoba kod kojih stres utiče na dužinu spavanja: ', broj_pulsa_kiseonika/broj_stresiranih*100, '%')

dužina_sna,count
0.032,1291
0.576,1291
1.168,1291
0.352,1291
1.104,1291
0.688,1291
0.416,1291
0.144,1291
1.184,1291
1.424,1291


Output can only be rendered in Databricks

Prosek osoba kod kojih stres utiče na dužinu spavanja:  27.246393083702912 %


Prethodno pokazanom analizom dolazim do zaključka da od ukupno 445325 osoba koje imaju problema sa stresom njih 121335 ima vrlo visok novi stresa što pored spavanja dodatno utiče na povećanje pulsa kao i kiseonika u krvi. U procentima to iznosi oko 27%.  
Dakle ovim bismo mogli reći da stres utiče na dužinu sna i samim tim je hipoteza <b>tačna</b>.

####Hipoteza 2  
Nivo stresa je visok što je veća aktivnost.

In [0]:
display(all_data_df.select('stres')
       .where(col('broj_koraka') > 0)
       .groupBy('stres')
       .count()
       .orderBy('count', ascending=False))



stres,count
2,162640
1,161350
3,121335


Output can only be rendered in Databricks

Proverićemo još da li vlažnost utiče na aktivnost i brzinu disanja kod osoba sa stresom.  
Napravio bih još dva data frame-a. Jedan data frame bi se odnosio na broj aktivnosti kada su uključeni parametri vlažnosti. Dok drugi data frame bi pokazao broj aktivnosti prouzrokovan brzinom disanja.

In [0]:
broj_aktivnosti_vl = (all_data_df.select('stres', 'broj_koraka')
                     .where(col('stres') == 3)
                     .where(col('vlažnost') > 25))

broj_aktivnosti_vl = (broj_aktivnosti_vl.groupBy('broj_koraka')).count()
broj_aktivnosti_vl = (broj_aktivnosti_vl.orderBy('broj_koraka')
                      .withColumnRenamed('count', 'broj_aktivnosti_vl'))

display(broj_aktivnosti_vl)

broj_koraka,broj_aktivnosti_vl
130,658
131,658
132,658
133,658
134,658
135,658
136,658
137,658
138,658
139,658


Output can only be rendered in Databricks

In [0]:
broj_aktivnosti_bd = (all_data_df.select('stres', 'broj_koraka')
                     .where(col('stres') == 3)
                     .where(col('brzina_disanja') > 0))

broj_aktivnosti_bd = (broj_aktivnosti_bd.groupBy('broj_koraka')).count()
broj_aktivnosti_bd = (broj_aktivnosti_bd.orderBy('broj_koraka')
                      .withColumnRenamed('count', 'broj_aktivnosti_bd'))

display(broj_aktivnosti_bd)

broj_koraka,broj_aktivnosti_bd
51,470
52,470
53,470
54,470
55,470
56,470
57,470
58,470
59,470
60,470


Output can only be rendered in Databricks

In [0]:
all_data_df.select(avg('broj_koraka').alias('prosek')).show()
broj_aktivnosti_vl.select(avg('broj_koraka').alias('prosek aktivnosti - vlažnost')).show()
broj_aktivnosti_bd.select(avg('broj_koraka').alias('prosek aktivnosti - disanje')).show()

+------------------+
|            prosek|
+------------------+
|134.41504743726492|
+------------------+

+----------------------------+
|prosek aktivnosti - vlažnost|
+----------------------------+
|                       165.0|
+----------------------------+

+---------------------------+
|prosek aktivnosti - disanje|
+---------------------------+
|                      125.5|
+---------------------------+



Dolazimo do zaključka da prvobitna prosečna aktivnost iznosi 134 koraka uzimajući u obzir celokupan data set. U što precizniju analizu podataka uključujemo parametre vlažnost i brzinu disanja, kojima bismo pokazali i još bolje utvrdili da li će zajedno sa njima naš član u hipotezi srazmerno rasti.  
Time smo zaključili da veća vlažnost nam pokazuje veći broj koraka pri visokom stresu, ali još jedna pretpostavka u analizi koju smo koristili nam daje rezultat pri kojem dobijamo znatno manji broj koraka od prosečnog.   
Na kraju možemo konstatovati da je hipoteza <b>netačna</b>

####Hipoteza 3  
Visok stres prouzrokuje povećanje temperature i smanjenje kiseonika u krvi.

Prikazaćemo broj osoba sa stresom u odnosu na temperaturu, kao i u odnosu na kiseonik u krvi.

In [0]:
broj_temp_df = (all_data_df.select('stres', 'temperatura_tela')
               .where(col('stres') > 0))
broj_temp_df = (broj_temp_df.groupBy(col('temperatura_tela')
                                     .alias('temperatura_tela')).count())

print('Broj osoba koda koje imaju stres: ', broj_temp_df.count())
display(broj_temp_df)

Broj osoba koda koje imaju stres:  345


temperatura_tela,count
94.704,1291
90.336,1291
93.424,1291
93.216,1291
90.864,1291
95.856,1291
93.792,1291
94.72,1290
92.256,1291
95.808,1291


Output can only be rendered in Databricks

In [0]:
broj_kis_df = (all_data_df.select('stres', 'kiseonik_u_krvi')
               .where(col('stres') > 0))
broj_kis_df = (broj_kis_df.groupBy(col('kiseonik_u_krvi')
                                     .alias('kiseonik_u_krvi')).count())

print('Broj osoba koda koje imaju stres: ', broj_kis_df.count())
display(broj_kis_df)

Broj osoba koda koje imaju stres:  345


kiseonik_u_krvi,count
91.904,1291
89.488,1291
90.336,1291
92.024,1291
90.864,1291
93.56,1290
88.848,1291
91.888,1291
89.328,1291
91.92,1290


Output can only be rendered in Databricks

In [0]:
broj_temp_visok_df = (all_data_df.select('stres', 'temperatura_tela')
               .where(col('stres') == 3))
broj_temp_visok_df = (broj_temp_visok_df.groupBy(col('temperatura_tela')
                                     .alias('temperatura_tela')).count())

print('Broj osoba koda koje imaju visok stres: ', broj_temp_visok_df.count())
display(broj_temp_visok_df)

Broj osoba koda koje imaju visok stres:  94


temperatura_tela,count
90.336,1291
90.864,1291
90.112,1291
90.832,1291
91.216,1291
90.752,1291
90.352,1291
90.016,1291
90.672,1291
90.32,1290


Output can only be rendered in Databricks

In [0]:
broj_kis_visok_df = (all_data_df.select('stres', 'kiseonik_u_krvi')
               .where(col('stres') == 3))
broj_kis_visok_df = (broj_kis_visok_df.groupBy(col('kiseonik_u_krvi')
                                     .alias('kiseonik_u_krvi')).count())

print('Broj osoba koda koje imaju visok stres: ', broj_kis_visok_df.count())
display(broj_kis_visok_df)

Broj osoba koda koje imaju visok stres:  94


kiseonik_u_krvi,count
89.488,1291
88.848,1291
89.328,1291
88.688,1291
88.112,1291
89.344,1291
88.528,1291
88.768,1291
89.36,1290
88.304,1291


Output can only be rendered in Databricks

In [0]:
broj_temp_df.select(avg('temperatura_tela').alias('prosek temperature')).show()
broj_temp_visok_df.select(avg('temperatura_tela').alias('prosek temperature - visok stres')).show()


+------------------+
|prosek temperature|
+------------------+
| 93.11285797101449|
+------------------+

+--------------------------------+
|prosek temperature - visok stres|
+--------------------------------+
|                          90.744|
+--------------------------------+



In [0]:
broj_kis_df.select(avg('kiseonik_u_krvi').alias('prosek kiseonika')).show()
broj_kis_visok_df.select(avg('kiseonik_u_krvi').alias('prosek kiseonika - visok stres')).show()

+-----------------+
| prosek kiseonika|
+-----------------+
|91.29546666666661|
+-----------------+

+------------------------------+
|prosek kiseonika - visok stres|
+------------------------------+
|             88.74399999999996|
+------------------------------+



Na osnovu analize podataka i izdvajanju uzoraka dolazimo do sledećeg:  
visok stres je uticao i na temperaturu i na nivo kiseonika u krvi i to na način takav da prosečan nivo kiseonika je veći od prosečnog nivoa kiseonika tokom visokog stresa.  
Međutim, kod temperature je identična situacija kao kod nivoa kiseonika u krvi, što se ne slaže sa početnom hipotezom i što nas dovodi do zaključka da je pretpostavka bila <b>netačna</b>.

##Primena algoritma mašinskog učenja

U nastavku analizu podataka ćemo odraditi primenom algoritma mašinskog učenja. Biće korišćen višeslojni perceptron (MLP) koji spada u kategoriju algoritama sa nadgledanim učenjem (supervised learning). Najčešće se koristi za regresiju i klasifikaciju podataka.  

Kroz više slojeva ćemo prikazati klasifikaciju podataka. Zamisao je da pored transformisanih podataka koji se učitavaju iz DBFS-a, postoje tri sloja koja će po nivoima imati različitu topologiju. Na samom kraju ćemo imati softmax funkciju koja će u fazi testiranja da izvuče najbolje rezultate koje ćemo dalje proslediti na izlaz.

Pre početka rada predstavićemo način obrade podataka kroz tri faze: treniranje, tesiranje i validacije.  
Za treniranje ćemo izdvojiti 80% uzoraka, dok ćemo testiranje i validaciju objediniti sa 20% uzoraka.  
Kod ciklusa treniranja kao funkciju aktivacije ćemo koristiti tanh (hiperbolički tangens). U fazi testiranja kao što sam pomenuo prmenićemo Softmax funkciju, koju ne moramo implicinto da navodimo, jer je u sparku ona određenim importima uključena (MultilayerPerceptronClassificationModel).

In [0]:
!pip install tensorflow

Collecting tensorflow
  Downloading tensorflow-2.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (588.3 MB)
[?25l[K     |                                | 10 kB 22.8 MB/s eta 0:00:26[K     |                                | 20 kB 12.3 MB/s eta 0:00:48[K     |                                | 30 kB 6.8 MB/s eta 0:01:27[K     |                                | 40 kB 4.4 MB/s eta 0:02:15[K     |                                | 51 kB 5.2 MB/s eta 0:01:54[K     |                                | 61 kB 5.6 MB/s eta 0:01:46[K     |                                | 71 kB 5.2 MB/s eta 0:01:54[K     |                                | 81 kB 5.2 MB/s eta 0:01:54[K     |                                | 92 kB 5.8 MB/s eta 0:01:42[K     |                                | 102 kB 4.9 MB/s eta 0:02:01[K     |                                | 112 kB 4.9 MB/s eta 0:02:01[K     |                                | 122 kB 4.9 MB/s eta 0:02:01[K     |               

U notebook je uvežena biblioteka TensorFlow, zbog potreba u fazi treniranja.

In [0]:
import tensorflow as tf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stresAnalizaPodataka").config("spark.jars.packages", "org.tensorflow:spark-tensorflow-connector_2.12:1.15.0").getOrCreate()

In [0]:
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("stresAnalizaPodataka").getOrCreate()

df = spark.read.format("csv").option("header", "true").load(all_data_df_path)



Učitavanje podataka i kreiranje df. Neophodno je bilo odraditi cast u narednom koraku.

In [0]:
df = df.withColumn('hrkanje', col('hrkanje').cast('double'))
df = df.withColumn('brzina_disanja', col('brzina_disanja').cast('double'))
df = df.withColumn('temperatura_tela', col('temperatura_tela').cast('double'))
df = df.withColumn('kiseonik_u_krvi', col('kiseonik_u_krvi').cast('double'))
df = df.withColumn('dužina_sna', col('dužina_sna').cast('double'))
df = df.withColumn('puls', col('puls').cast('double'))
df = df.withColumn('stres', col('stres').cast('int'))
df = df.withColumn('vlažnost', col('vlažnost').cast('double'))
df = df.withColumn('broj_koraka', col('broj_koraka').cast('int'))
df = df.withColumn('nivo_stresa', col('nivo_stresa').cast('int'))

Radi dobijanja što boljih rezultata prilikom ove analize odlučilo sam da odbacim sledeće dve kolone.

In [0]:
df = df.drop('vlažnost', 'nivo_stresa')

display(df)

hrkanje,brzina_disanja,temperatura_tela,kiseonik_u_krvi,dužina_sna,puls,stres,broj_koraka
60.0,20.0,96.0,95.0,7.0,60.0,1,51
85.76,23.536,90.768,88.768,0.768,68.84,3,51
56.88,19.376,95.376,94.064,6.376,58.44,1,51
55.52,19.104,95.104,93.656,6.104,57.76,1,51
73.44,21.344,93.344,91.344,4.016,63.36,2,51
59.28,19.856,95.856,94.784,6.856,59.64,1,51
87.8,24.08,91.04,89.04,1.04,70.2,3,51
52.32,18.464,94.464,92.696,5.464,56.16,1,51
52.64,18.528,94.528,92.792,5.528,56.32,1,51
86.24,23.664,90.832,88.832,0.832,69.16,3,51


In [0]:
#klasa koja nasleđuje tf.keras.Model
#potrebna nam je da bi se model mogao koristiti sa sparkom
class SparkModelWrapper(tf.keras.Model):
    def __init__(self, model):
        super(SparkModelWrapper, self).__init__()
        self.model = model

    def call(self, inputs):
        return self.model(inputs)
    
#kreiranje keras modela
keras_model = Sequential()
keras_model.add(Dense(64, activation='tanh', input_shape=(8,)))
keras_model.add(Dense(32, activation='tanh'))
keras_model.add(Dense(16, activation='tanh'))
keras_model.add(Dense(5, activation='tanh'))


In [0]:
#SparkModelWrapper služi za podešavanje keras modela
spark_model = SparkModelWrapper(keras_model)

#kompajliranje modela
sgd = SGD(learning_rate=0.001)
spark_model.compile(optimizer=sgd, loss='sparse_categorical_crossentropy', metrics=['accuracy'])


In [0]:
# kreiranje asembler vektora za spajanje kolona u jednu features kolonu
assembler = VectorAssembler(inputCols=["hrkanje", "brzina_disanja", "temperatura_tela", "kiseonik_u_krvi", "dužina_sna", "puls", "stres", "broj_koraka"], outputCol="features")

#dodajemo novu kolonu label koju smo izabrali kao ciljnu varijablu
#ona predstavlja nivo stresa čiji je sadržaj prikazan na pokačtku ovog notebook-a
df = df.withColumn("label", col("stres"))


In [0]:
#transformišemo izabrani data frame
transf = assembler.transform(df)

In [0]:
#ovim delimo set na podatke za treniranje koji čine 80% i podatke za testiranje i validaciju koji čine 20%
#(trainingData, testData) = transf.randomSplit([0.8, 0.2])
#trainingData.cache()

In [0]:
#treniranje modela
mlp = mlp = MultilayerPerceptronClassifier(featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=10, blockSize=800, layers=[8, 64, 32, 16, 5], stepSize=0.001)
model = mlp.fit(assembler.transform(df))

In [0]:
(trainingData, testData) = transf.randomSplit([0.8, 0.2])
trainingData.cache()

#testiranje modela
predictions = model.transform(testData)

In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

Accuracy: 0.7112620793255756


Na samom kraju dobijamo tačnost našeg algoritma koja iznosi približno 72%.  
To nas dovodi do zaključka da smo postigli veoma visoku tačnost u odnosu na očekivanu koju smo postavili na početku analize, a ta vrednost je bila 80%.  
A samim tim možemo reći da kao rezultat celokupne analize svi faktori imaju udela u razvoju visokog nivoa stresa kod čoveka.  
Takođe, imam i sliku višeslojnog perceptrona koja nažalost nije mogla da stane u ovaj markdown, ali biće poslata uz sam rad.

<img src="https://drive.google.com/file/d/1GElHDM121XgunLusyR7_HsYkXPY7Be5s/view?usp=share_link" alt="Višeslojni perceptron" />

##Structured Streaming

In [0]:
%fs ls dbfs:/streaming/df.csv

path,name,size,modificationTime
dbfs:/streaming/df.csv/_SUCCESS,_SUCCESS,0,1676291040000
dbfs:/streaming/df.csv/_committed_3341368505843524836,_committed_3341368505843524836,113,1676291039000
dbfs:/streaming/df.csv/_started_3341368505843524836,_started_3341368505843524836,0,1676291036000
dbfs:/streaming/df.csv/part-00000-tid-3341368505843524836-1a17a0b3-ef84-4e77-bc79-aa901c29d024-538-1-c000.csv,part-00000-tid-3341368505843524836-1a17a0b3-ef84-4e77-bc79-aa901c29d024-538-1-c000.csv,22843777,1676291038000


In [0]:
all_str_path = 'dbfs:/streaming/'

schema = StructType([
     StructField("stres", IntegerType(), True),
     StructField('timestamp', StringType(), True),
    # StructField('count', LongType(), True)
])

In [0]:
from pyspark.sql.functions import *
str_df = (spark
            .read
            .schema(schema)   
            .json(all_str_path))

#broj_str = (str_df.groupBy('stres').count())
#broj_str = (str_df.groupBy(str_df.stres, window(str_df.timestamp, "1 hour")).count())
#broj_str.isStreaming
display(str_df)

DataFrame[stres: int, timestamp: string]

In [0]:
from pyspark.sql.functions import *    

count_DF = (str_df.groupBy(str_df.stres, window(str_df.timestamp, "1 hour")).count())
count_DF.cache()

display(count_DF)

DataFrame[stres: int, window: struct<start:timestamp,end:timestamp>, count: bigint]

In [0]:
count_DF.createOrReplaceTempView("static_counts")

In [0]:
%sql
select stres, sum(count) as total_count from static_counts group by stres

DataFrame[stres: int, total_count: bigint]

In [0]:
# keep the size of shuffles small
spark.conf.set("spark.sql.shuffle.partitions", "2")  

#query = (str_df
#    .writeStream
#   .format("memory")        
#    .queryName("kiseonik_u_krvi")    
#    .outputMode("append")  
#    .start())

In [0]:
from time import sleep
sleep(5)

In [0]:
%sql select stres, sum(count) from nivo_stresa group by stres

DataFrame[stres: int, sum(count): bigint]

In [0]:
query.stop()