In [None]:
# default_exp df_people_basics

# Grundlagen von Sparks Dataframe

In [None]:
#hide
# Autoreload funktioniert nicht bei XDebug
#%load_ext autoreload
#%autoreload 2

In [None]:
#hide
from nbdev.showdoc import *

## Init Spark Context

In [None]:
from udemy_spark.spark_core import *
spark = get_spark_session() # Session anlegen
spark # Ausgabe der wichtigsten Session Informationen

## Laden von Dummy Daten

In [None]:
df_people = spark.read.json('people.json')
df_apple  = spark.read.csv('appl_stock.csv',inferSchema=True,header=True)

## Informationen von DF abfragen

In [None]:
df_people.show()        # zeigt die Daten
df_people.printSchema() # zeigt das Schema
df_people.columns       # Attribut mit den Columnnames
df_people.describe()    # Beschreibung

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



DataFrame[summary: string, age: string, name: string]

## Laden von Daten
### JSON
#### typisiert lesen

In [None]:
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)]
final_struc = StructType(fields=data_schema)
df_people = spark.read.json('people.json', schema=final_struc)
df_people.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



## Inhalt von DF anzeigen

In [None]:
df_people.head(2) # liefert eine Liste mit Row-Elementen

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [None]:
df_people.show(2) # zeigt 2 Zeilen -> df_people.show() zeigt alles

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
+----+-------+
only showing top 2 rows



## Zugriff auf Spalten

In [None]:
print(df_people['age'])
print(type(df_people['age']))

Column<b'age'>
<class 'pyspark.sql.column.Column'>


In [None]:
df_people.select('age')#.show() -> df_people.select -> returns a dataframe, not just a  column

DataFrame[age: int, name: string]

In [None]:
df_people.select(['age','name']) #.show()

DataFrame[age: int, name: string]

## Daten filtern

### Like SQL-Where

In [None]:
df_apple.filter("Close<500") # Die Filter koennen wie Where-Klauseln von SQL Statements definiert werden
df_apple.filter("Close<500").select(['Open','Close']).show(3)  # filter gibt wieder ein DF zurueck.

+----------+----------+
|      Open|     Close|
+----------+----------+
|213.429998|214.009998|
|214.599998|214.379993|
|214.379993|210.969995|
+----------+----------+
only showing top 3 rows



### Standard Python / Pandas Style

In [None]:
df_apple.filter(df_apple['Close'] < 500).show(2) # standard Pandas Style
df_apple.filter( (df_apple["Close"] < 200) & (df_apple['Open'] > 200) ).show(2) # Achtung: die Klammern sind wichtig, anstelle von 'and' und 'or' & und | verwenden
df_apple.filter( (df_apple["Close"] < 200) & ~(df_apple['Open'] > 200) ).show(2) # ~ ist not Funktion: hier "nicht groesser als 200"

+-------------------+----------+----------+------------------+----------+---------+------------------+
|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
+-------------------+----------+----------+------------------+----------+---------+------------------+
only showing top 2 rows

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.4999

### Rueckgabe als Python Objekte

In [None]:
result = df_apple.filter(df_apple["Low"] == 197.16).collect()
print(type(result)) # Python Liste
print(type(result[0])) # mit Spark Row Objekten

<class 'list'>
<class 'pyspark.sql.types.Row'>


In [None]:
print(result[0].asDict()) # eine Row kann direkt als Dict zurueckgegeben werden
for item in result[0]: # ueber die Spalten in einer Row kann auch direkt iteriert werden
    print(item, end=' | ')


{'Date': datetime.datetime(2010, 1, 22, 0, 0), 'Open': 206.78000600000001, 'High': 207.499996, 'Low': 197.16, 'Close': 197.75, 'Volume': 220441900, 'Adj Close': 25.620401}
2010-01-22 00:00:00 | 206.78000600000001 | 207.499996 | 197.16 | 197.75 | 220441900 | 25.620401 | 

## Manipulation von DF

### Neue Kolonnen erzeugen

In [None]:
df_people.withColumn('newage', df_people['age']).show()   # erzeugt ein neues DF mit einer neue Spalte basierend auf 'age'
df_people.withColumn('newage', df_people['age']*2).show()   # erzeugt ein neues DF mit einer neue Spalte basierend auf 'age'
df_people.withColumnRenamed('age','newage').show() # erzeugt neues DF mit umbenannter Spalte

## SQL verwenden

In [None]:
# Registriere das Datenframe als voruebergehende SQL-Ansicht
df_people.createOrReplaceTempView("people")

In [None]:
sql_results = spark.sql("SELECT * FROM people") # sql_results ist ein Dataframe
sql_results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

