<a href="https://colab.research.google.com/github/ChristianKitte/SparkProjekt/blob/main/notebook/Wordcount_mit_Spark_DataFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Vorbereitung des Notebooks

In [None]:
# Installation  von Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

print("Java ist installiert...")

# Download und Entpacken von Spark (Versionsnummer anpassen!)
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

print("Spark ist verfügbar...")

# Setzen der Systemvariablen für Java und Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

print("Umgebungsvariablen sind gesetzt...")

Java ist installiert...
Spark ist verfügbar...
Umgebungsvariablen sind gesetzt...


In [None]:
# Installation von findspark und pyspark

!pip install findspark
print("FindSpark wurde installiert...")

!pip install pyspark
print("PySpark wurde installiert...")

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2
FindSpark wurde installiert...
Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=5575945594cc16930e0a9a5b95ae22577df6eafd6426ae3369a759e38852cacc
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
PySpark wurde installiert.

In [None]:
# Initialisieren von findspark

try: 
  import findspark
  from pyspark.sql import SparkSession
  
  findspark.init()
  
  print("FindSpark und PySpark wurden initialisiert")
except ImportError: 
  raise ImportError("Fehler bei der Initialiserung von FindSpark und PySpark")

FindSpark und PySpark wurden initialisiert


# Einlesen und Vorbereiten der Textdatei

Im ersten Abschnitt werden zunächst zwei Methoden definiert.

* Der ersten Methode get_file_from_url werden als Parameter eine URL sowie ein Speicherort angegeben. Bei Ihrem Aufruf lädt die Methode eine Datei von der angegebenen URL herunter und speichert sie in Google Drive ab.

* Die zweite Methode cut_file nimmt als Parameter einen numerischen Start- und Endwert sowie die Angabe einer Quell- und Zieldatei entgegen. Bei Ihrem Aufruf entfernt die Methode alle Zeilen vor bzw. nach den durch Start- und Endwert definierten Zeilenbereich aus der Quelldatei und speichert das Ergebnis in die Zieldatei.

In dem folgenden Block wird dann im Anschluss die Datei mit den gesammelten Werken von Shakespeare von der Seite des MIT herunter geladen sowie von nicht benötigten Zeilen bereinigt und in einer neuen Datei gespeichert.

In [None]:
# Erstellen einer Methode, um Dateien aus dem Internet zu laden und zu speichern

import requests 

def get_file_from_url(file_url, place_to_save):
  try:
    req = requests.get(file_url, stream = True) 

    with open(place_to_save, "wb") as file: 
	    for block in req.iter_content(chunk_size = 1024): 
		    if block: 
			    file.write(block) 
     
    print("Die Datei wurde herunter geladen und angelegt: {}".format(file_url))
  
  except ValueError:
    print("Fehler {}".format(ValueError))   

print("Die Funktion get_file_from_url wurde angelegt...")


Die Funktion get_file_from_url wurde angelegt...


In [None]:
# Erstellen einer Methode, um eine Textdatei am Anfang und am Ende um die jeweils
# angegebene Zahl an Reihen zu beschneiden.

def cut_file(anfang, ende, quelldatei, zieldatei):
  try:
    with open(quelldatei, "r") as source:
      lines = source.readlines()
    
    source.close()

    print("")
    print("Start: {}".format(anfang))
    print("Ende: {}".format(ende))
    print("")

    current_count = 0
  
    with open(zieldatei, "w") as target:
      for line in lines:
        if current_count >= anfang and current_count <= ende:
          target.write(line)

        current_count = current_count + 1   
    
    target.close()

    print("Datei wurde beschnitten...")

  except ValueError:
    print("Fehler {}".format(ValueError))

print("Die Funktion cut_file wurde angelegt...")

Die Funktion cut_file wurde angelegt...


In [None]:
# Datei von der Quelle nach Colab laden

file_url = "https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt"
place_to_save = "/content/shakespeare.txt"

get_file_from_url(file_url, place_to_save)

print("")
print("Datei wurde vorbereitet...")

Die Datei wurde herunter geladen und angelegt: https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt

Datei wurde vorbereitet...


In [None]:
# Unnötige Zeilen am Ende und am Start entfernen

file_source = "/content/shakespeare.txt"
file_target = "/content/shakespeare_neu.txt"

cut_file(244,124438,file_source, file_target)

print("")
print("Die Arbeitsdatei ist vorbereitet...")


Start: 244
Ende: 124438

Datei wurde beschnitten...

Die Arbeitsdatei ist vorbereitet...


# Auszählen der Wörter

Um mit Spark arbeiten zu können, muss als erstes eine Verbindung zu Spark in Form eines SparkContext aufgebaut werden. In dem hier verwendeten Code wird ein SparkContext erzeugt, welcher die Bezeichnung WordCounter erhält. Er soll lokal laufen und hierbei parallel alle verfügbaren Kerne verwenden. Dieser Block kann in einer Anwendung nur ein Mal ausgeführt werden.

Anschließend wird die Textdatei eingelesen und gibt ein RDD in Form einer Liste von String zurück. In diesen Fall entsprechen die Strings den Zeilen der Textdatei. Die Methode map führt auf jedes Element des zugrunde liegenden RDD - also den Zeilen der Textdatei - die angegebene Funktion aus.

In dem hier vorliegenden Fall findet zunächst eine Reihe von Ersetzungen (replace), dann eine Konvertierung in Kleinbuchstaben (lower) und am Schluss eine Filterung (filter) auf leere Zeilen statt. Als Ergebnis wird ein neues RDD vom Typ String zurückgegeben. Das ursprüngliche RDD wird nicht verändert. Es ist immutable. Die Verwendung einer FluentApi bewirkt eine übersichtliche Strukturierung des Codes.

In der folgenden Codesequenze wird jedes Listenelement des RDD durch flatMap in seine einzelnen Wörter aufgeteilt. Für jedes Wort wird ein Tupel erzeugt und zurückgegeben. Da es sich um eine flatMap handelt, verfügt das zurück gegebene RDD nur noch über eine sehr lange Liste von Tupel. Die Funktion reduceByKey merged im Anschluss die einzelnen Tupel. Als Ergebnis erhält man eine Liste von Tupel mit eindeutigen Wörtern und deren Vorkommen.

Mit der Methode sortBy wird auf die Anzahl der Wortvorkommen sortiert. Das zurück gegebene RDD sorted_counts kann im Anschluss ausgegeben werden, nachdem mit collect alle Werte eingesammelt wurden.

In [None]:
# Erzeugen einer Spark Session

session = SparkSession.builder.appName("Wordcount").getOrCreate()

print("Die Spark Session wurde angelegt...")

Die Spark Session wurde angelegt...


In [None]:
# Auszählen der Wörter

import pyspark.sql.functions as func

dfx = session.read.text(file_target)

top_out = 30
top_length = 30

print("")
print("Ausgabe der ersten {} Zeilen des Textes".format(top_out))
print("")

dfx.show(n=top_out,truncate=False)

#dfx.printSchema()
#dfx.describe().show()
#print(dfx.columns)

#https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.explode
#https://dwgeek.com/replace-pyspark-dataframe-column-value-methods.html/

dfx=dfx.withColumn('value', func.translate('value', ',', ' '))
dfx=dfx.withColumn('value', func.translate('value', '.', ' '))
dfx=dfx.withColumn('value', func.translate('value', '-', ' '))
dfx=dfx.withColumn('value', func.lower('value'))

print("")
print("Ausgabe der {} größten Vorkommen".format(top_length))
print("")

dfx=dfx.withColumn('value2',func.explode(func.split(func.col('value'), ' ')))\
  .groupBy('value2')\
  .count()\
  .sort('count', ascending=False)\
  .show(n=top_length,truncate=False)


Ausgabe der ersten 30 Zeilen des Textes

+-------------------------------------------------------+
|value                                                  |
+-------------------------------------------------------+
|1609                                                   |
|                                                       |
|THE SONNETS                                            |
|                                                       |
|by William Shakespeare                                 |
|                                                       |
|                                                       |
|                                                       |
|                     1                                 |
|  From fairest creatures we desire increase,           |
|  That thereby beauty's rose might never die,          |
|  But as the riper should by time decease,             |
|  His tender heir might bear his memory:               |
|  But thou contracted to thin