In [1]:
import os
import findspark

findspark.init()

from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'

In [2]:
def stopCtx():
    try:
        sc1.stop()
        print("Spark Context stopped")
    except Exception as ex1:
        print('No context found: %s' % str(ex1))
stopCtx()

No context found: name 'sc1' is not defined


In [3]:
spark1 = SparkSession.builder \
    .config("spark.app.name", os.environ['JUPYTERHUB_CLIENT_ID'])\
    .config("spark.executor.instances", "1")\
    .getOrCreate()

sc1 = spark1.sparkContext

print(spark1)


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/22 21:43:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<pyspark.sql.session.SparkSession object at 0x7fc790dd03d0>


In [4]:
#load the data
df = spark1.read.csv("hdfs://namenode:19000/data.tsv", sep='\t', header=True)
df.printSchema()

                                                                                

root
 |-- measurement_count: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- version: string (nullable = true)
 |-- co2_ppm: string (nullable = true)
 |-- temperature_celsius: string (nullable = true)
 |-- relative_humidity_percent: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [5]:
# "temperature_celsius", "co2_ppm", "relative_humidity_percent" in Float mit 2 Nachkommastellen formatiert
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, round
df2 = df.withColumn("temperature_celsius",round(df.temperature_celsius.cast(FloatType()),2))
df3 = df2.withColumn("co2_ppm",round(df2.co2_ppm.cast(FloatType()),2))
df4 = df3.withColumn("relative_humidity_percent",round(df3.relative_humidity_percent.cast(FloatType()),2))
df4.printSchema()
df4.show()

root
 |-- measurement_count: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- version: string (nullable = true)
 |-- co2_ppm: float (nullable = true)
 |-- temperature_celsius: float (nullable = true)
 |-- relative_humidity_percent: float (nullable = true)
 |-- timestamp: string (nullable = true)

+-----------------+--------------------+-------+-------+-------------------+-------------------------+----------+
|measurement_count|       serial_number|version|co2_ppm|temperature_celsius|relative_humidity_percent| timestamp|
+-----------------+--------------------+-------+-------+-------------------+-------------------------+----------+
|              200|s_d8bfc014724e_26...|  0.9.8|  420.0|               23.0|                     36.0|1617271728|
|              458|s_10521c0202ab_28...|  0.9.8|  421.0|               24.0|                     32.0|1617271736|
|              459|s_10521c0202ab_28...|  0.9.8|  420.0|               24.0|                     32.0|1617

In [6]:
#timestamp in timestamp formatiert
from pyspark.sql.functions import *
df5 = df4.withColumn("timestamp",to_timestamp(from_unixtime(col("timestamp"))))
df5.printSchema()

root
 |-- measurement_count: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- version: string (nullable = true)
 |-- co2_ppm: float (nullable = true)
 |-- temperature_celsius: float (nullable = true)
 |-- relative_humidity_percent: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [7]:
#Daten in RDD laden mit textFile(Pfad)
rddFromTSV = sc1.textFile("hdfs://namenode:19000/data.tsv")
rddFromTSV.take(5)

                                                                                

['"measurement_count"\t"serial_number"\t"version"\t"co2_ppm"\t"temperature_celsius"\t"relative_humidity_percent"\t"timestamp"',
 '"200"\t"s_d8bfc014724e_262793"\t"0.9.8"\t"420"\t"23.0"\t"36.0"\t"1617271728"',
 '"458"\t"s_10521c0202ab_284839"\t"0.9.8"\t"421"\t"24.0"\t"32.0"\t"1617271736"',
 '"459"\t"s_10521c0202ab_284839"\t"0.9.8"\t"420"\t"24.0"\t"32.0"\t"1617271739"',
 '"113"\t"s_e8db84c5f771_300390"\t"0.9.8"\t"651"\t"20.0"\t"44.0"\t"1617271761"']

In [8]:
#Columns splitten und Header entfernen
rdd = rddFromTSV.map(lambda line: line.split("\t"))

headers = rdd.first()
rdd = rdd.filter(lambda line: line != headers)

rdd.take(5)

                                                                                

[['"200"',
  '"s_d8bfc014724e_262793"',
  '"0.9.8"',
  '"420"',
  '"23.0"',
  '"36.0"',
  '"1617271728"'],
 ['"458"',
  '"s_10521c0202ab_284839"',
  '"0.9.8"',
  '"421"',
  '"24.0"',
  '"32.0"',
  '"1617271736"'],
 ['"459"',
  '"s_10521c0202ab_284839"',
  '"0.9.8"',
  '"420"',
  '"24.0"',
  '"32.0"',
  '"1617271739"'],
 ['"113"',
  '"s_e8db84c5f771_300390"',
  '"0.9.8"',
  '"651"',
  '"20.0"',
  '"44.0"',
  '"1617271761"'],
 ['"202"',
  '"s_d8bfc014724e_262793"',
  '"0.9.8"',
  '"422"',
  '"23.0"',
  '"36.0"',
  '"1617271769"']]

In [9]:
#Wie viele Sensoren enthält Datenmenge?
rdd_distinct_serial = rdd.map(lambda x: x[1])
print("Sensoren : "+ str(rdd_distinct_serial.distinct().count()))



Sensoren : 22


                                                                                

In [10]:
#wieviele Datenpunkte je Sensor liegen vor?
rdd_anzahl_serial_number = rdd.map(lambda x : (x[1],1))
rdd_anzahl_serial_number.reduceByKey( lambda a,b : a + b ).collect()

                                                                                

[('"s_e8db84c5f33d_281913"', 2269085),
 ('"s_10521c01cf19_262520"', 385103),
 ('"s_e8db84c5f33d_0xdeadbeef"', 1515),
 ('"s_8caab57cc961_284337"', 1131861),
 ('"s_8caab57cc961_"', 7),
 ('"s_8caab57c3e19_"', 1),
 ('"s_8caab57a6dd9_"', 6),
 ('"s_3c6105d3abae_"', 2),
 ('"s_8caab57a6dd9_288065"', 2781611),
 ('"s_e8db84c5f33d"', 83),
 ('"s_3c6105d3abae_299589"', 1533917),
 ('"s_8caab57c3e19_282028"', 1561045),
 ('"s_d8bfc014724e_262793"', 2103522),
 ('"s_8caab57a6dd9"', 11),
 ('"s_e8db84c5f33d_"', 13),
 ('"s_e8db84c5f771_"', 2),
 ('"s_10521c0202ab_284839"', 2064),
 ('"s_8caab57a6dd9_0xdeadbeef"', 49),
 ('"s_e8db84c5f771_300390"', 1665528),
 ('"s_10521c01cf19_"', 2),
 ('"s_d8bfc0147061_283903"', 578456),
 ('"s_d8bfc0147061_"', 1)]

In [11]:
#was ist der höchste , und was der niedrigste Temperaturwert?

#rdd_max_temp = rdd.filter(lambda y: str(y) != "null")
rdd_max_temp = rdd.map(lambda x: x[4].strip(' "').strip('null'))
rdd_max_temp = rdd_max_temp.filter(lambda x: x != '')
rdd_max_temp = rdd_max_temp.map(lambda x: float(x))
print("Max Temperature: " + str(rdd_max_temp.max()))



Max Temperature: 36.0


                                                                                

In [12]:
rdd_min_temp = rdd.map(lambda x: x[4].strip(' "').strip('null'))
rdd_min_temp = rdd_min_temp.filter(lambda x: x != '')
rdd_min_temp = rdd_min_temp.map(lambda x: float(x))
print("Min Temperature: " + str(rdd_min_temp.min()))



Min Temperature: -1.0


                                                                                

In [13]:
#Was ist der durchschnittliche Co2-Wert (co2_ppm) je Sensor?
rdd_avg_co2ppm = rdd.map(lambda x: (x[1],x[3].strip('"').strip('null')))
rdd_avg_co2ppm = rdd_avg_co2ppm.filter(lambda x: x[1] != '')
rdd_avg_co2ppm = rdd_avg_co2ppm.map(lambda x: (x[0],float(x[1])))
rdd_avg_co2ppm = rdd_avg_co2ppm.combineByKey(
    (lambda v: (v,1)),
    (lambda C,v: (C[0]+v,C[1]+1)),
    (lambda C1,C2: (C1[0]+C2[0],C1[1]+C2[1]))
)
mean_co2ppm = rdd_avg_co2ppm.mapValues(lambda C: C[0]/C[1])
mean_co2ppm.collect()

                                                                                

[('"s_e8db84c5f33d_281913"', 531.2104615783285),
 ('"s_10521c01cf19_262520"', 500.67781606479303),
 ('"s_e8db84c5f33d_0xdeadbeef"', -1.0),
 ('"s_8caab57cc961_284337"', 470.7930161035675),
 ('"s_8caab57cc961_"', 429.2857142857143),
 ('"s_3c6105d3abae_"', 832.0),
 ('"s_8caab57a6dd9_"', 983.6666666666666),
 ('"s_8caab57c3e19_"', 620.0),
 ('"s_8caab57a6dd9_288065"', 654.238815923578),
 ('"s_e8db84c5f33d"', 1403.2289156626507),
 ('"s_3c6105d3abae_299589"', 833.9766714887442),
 ('"s_d8bfc014724e_262793"', 770.4554948583827),
 ('"s_8caab57c3e19_282028"', 876.9315842912921),
 ('"s_e8db84c5f33d_"', 488.7692307692308),
 ('"s_10521c0202ab_284839"', 436.76356589147287),
 ('"s_e8db84c5f771_"', 863.0),
 ('"s_8caab57a6dd9_0xdeadbeef"', -1.0),
 ('"s_e8db84c5f771_300390"', 954.0877634759553),
 ('"s_10521c01cf19_"', 467.5),
 ('"s_d8bfc0147061_283903"', 628.6339168406931),
 ('"s_d8bfc0147061_"', 722.0)]

In [14]:
#Wieviele Datenpunkte mit fehlenden Co2-Werte liegen vor?
rdd_avg_co2ppm = rdd.map(lambda x: x[3].strip(' "').strip('null'))
rdd_avg_co2ppm = rdd_avg_co2ppm.filter(lambda x: x == '')
print("Missing co2ppm values: " + str(rdd_avg_co2ppm.count()))



Missing co2ppm values: 19


                                                                                

<b>Stop context</b> 

In [15]:
stopCtx()

Spark Context stopped
