# DataFrame API

Inicializcáió kicsit megváltozik:

In [1]:
from pyspark.sql import  *
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Több fájlformátumból is létrehozható DataFrame.

In [2]:
df = spark.read.format('csv').load('dolgozo.csv')
# vagy spark.read.csv('dolgozo.csv')
df

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string]

A DataFrame táblázatosan kiiratható a `show()` metódussal.

In [3]:
df.show()

+----+------+-----------+------+---------+-------+-------+-----+
| _c0|   _c1|        _c2|   _c3|      _c4|    _c5|    _c6|  _c7|
+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
|7839|  KING|  PRESIDENT|  0000|81-NOV-17|   5000|      0|   10|
|7698| BLAKE|    MANAGER| 783d9|81-MAY-01|   2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|
|7499| ALLEN|   SALESMAN|  7698|81-FEB-20|   1600|    300|   30|
|7844|TURNER|   SALESMAN|  7698|81-SEP-08|   1500|      0|   30|
|7900| JAMES|      CLERK|  7698|81-DEC-03|    950|      0|   30|
|7521|  WARD|   SALESMAN|  7698|81-FEB-22|   1250|    500|   30|
|7902|  FORD|    ANALYST|  7566|81-DEC-03|   3000|      0|   20|
|7369| SMITH|      CLERK|  7902|80-DEC-17|    800|      0|   20|
|7788| SCOTT|    ANALYST|

CSV fájlok esetén meg kell adni, van-e fejléc sor. Ezt az option metódus meghívásával tudjuk megtenni.

In [4]:
df = spark.read.option('header', True).csv('dolgozo.csv')

In [5]:
df.show()

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|  0000|81-NOV-17|   5000|      0|   10|
|7698| BLAKE|    MANAGER| 783d9|81-MAY-01|   2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|
|7499| ALLEN|   SALESMAN|  7698|81-FEB-20|   1600|    300|   30|
|7844|TURNER|   SALESMAN|  7698|81-SEP-08|   1500|      0|   30|
|7900| JAMES|      CLERK|  7698|81-DEC-03|    950|      0|   30|
|7521|  WARD|   SALESMAN|  7698|81-FEB-22|   1250|    500|   30|
|7902|  FORD|    ANALYST|  7566|81-DEC-03|   3000|      0|   20|
|7369| SMITH|      CLERK|  7902|80-DEC-17|    800|      0|   20|
|7788| SCOTT|    ANALYST|  7566|82-DEC-09|   3000|      0|   20|
|7876| ADAMS|      CLERK|

A DataFrame oszlopokból áll ezek neveit, típusait és az egyes oszlopokra vonatkozó korlátozásokat sémának nevezzük. A séma kiírható a következő módon:

In [13]:
df.printSchema()

root
 |-- DKOD: string (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: string (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: string (nullable = true)
 |-- JUTALEK: string (nullable = true)
 |-- OAZON: string (nullable = true)


## Sémakikövetkezetetés

A sémakikövetkezés az inferSchema opció True paraméteres megadásával bekapcsolható. Ennek hatására a Spark az egyes értékek parse-olhatósága alapján állítja be a típusokat.

In [6]:
df = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load('dolgozo.csv')

In [8]:
df.printSchema()

root
 |-- DKOD: integer (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: string (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: integer (nullable = true)
 |-- JUTALEK: integer (nullable = true)
 |-- OAZON: integer (nullable = true)


## Manuális sémamegadás
`pyspark.sql.types` csomagban található objektumok segítségével történik.

In [9]:
from pyspark.sql.types import StringType,StructType, StructField, IntegerType

dolgozoSchema = StructType([
    StructField('DKOD', IntegerType()),
    StructField('DNEV', StringType()),
    StructField('FOGLALKOZAS', StringType()),
    StructField('FONOKE', IntegerType()),
    StructField('BELEPES', StringType()),
    StructField('FIZETES', IntegerType()),
    StructField('JUTALEK', IntegerType()),
    StructField('OAZON', IntegerType()),
])

In [10]:
df = spark.read.schema(dolgozoSchema).option('header', True).csv('dolgozo.csv')
df.printSchema()

root
 |-- DKOD: integer (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: integer (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: integer (nullable = true)
 |-- JUTALEK: integer (nullable = true)
 |-- OAZON: integer (nullable = true)


## Műveletek
A műveletek többnyire az SQL-ből ismert műveletek.

Az oszlopokra több módon is tudunk hivatkozni. Sok műveleten esetén elég az oszlop nevét stringgként megadni. Lehetőségünk van a dataframe adattagjaként is elérni őket. Ugyan erre a célre szolgál a `col()` függvény is (ennek akkor van értelme ha pusztán az oszlopnevet tartalmazó literálból kontextusából nem következtethető ki, hogy oszlopra akarunk referálni).

In [12]:
df.select('DNEV', df.FIZETES, df['FOGLALKOZAS'], col('OAZON')).show()

+------+-------+-----------+-----+
|  DNEV|FIZETES|FOGLALKOZAS|OAZON|
+------+-------+-----------+-----+
|  KING|   5000|  PRESIDENT|   10|
| BLAKE|   2850|    MANAGER|   30|
| CLARK|   2450|    MANAGER|   10|
| JONES|   2975|    MANAGER|   20|
|MARTIN|   1250|   SALESMAN|   30|
| ALLEN|   1600|   SALESMAN|   30|
|TURNER|   1500|   SALESMAN|   30|
| JAMES|    950|      CLERK|   30|
|  WARD|   1250|   SALESMAN|   30|
|  FORD|   3000|    ANALYST|   20|
| SMITH|    800|      CLERK|   20|
| SCOTT|   3000|    ANALYST|   20|
| ADAMS|   1100|      CLERK|   20|
|MILLER|   1300|      CLERK|   10|
|  LOLA|    800|      CLERK|   10|
| BLACK|   1800|       NULL|   20|
+------+-------+-----------+-----+


A vetítés során tudunk új oszlopokat is bevezetni. (megjegyzés: itt nem tudunk az oszlopnevekre csak mint string literál hivatkozni)

In [13]:
df.select(df.DNEV, (12 * df.FIZETES + 12 * df.JUTALEK).alias('KERESET')).show()

+------+-------+
|  DNEV|KERESET|
+------+-------+
|  KING|  60000|
| BLAKE|  34200|
| CLARK|  29400|
| JONES|  35700|
|MARTIN|  31800|
| ALLEN|  22800|
|TURNER|  18000|
| JAMES|  11400|
|  WARD|  21000|
|  FORD|  36000|
| SMITH|   9600|
| SCOTT|  36000|
| ADAMS|  13200|
|MILLER|  15600|
|  LOLA|   9600|
| BLACK|  25200|
+------+-------+


Mint SQL-ben a `*` minden oszlopot kiválaszt.

In [14]:
df.select('*').show(5)

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698| BLAKE|    MANAGER|  NULL|81-MAY-01|   2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|
+----+------+-----------+------+---------+-------+-------+-----+


Tudunk SQL kifejezés alapján is vetíteni, erről bővebben az `SQL.ipyn` fájlban van szó.

In [15]:
df.selectExpr('*', '(FIZETES + JUTALEK) as KERESET').show(5)

+----+------+-----------+------+---------+-------+-------+-----+-------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|KERESET|
+----+------+-----------+------+---------+-------+-------+-----+-------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698| BLAKE|    MANAGER|  NULL|81-MAY-01|   2850|      0|   30|   2850|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   2450|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   2975|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   2650|
+----+------+-----------+------+---------+-------+-------+-----+-------+


In [16]:
df.selectExpr('avg(FIZETES)', 'count(distinct FOGLALKOZAS)').show()

+------------+---------------------------+
|avg(FIZETES)|count(DISTINCT FOGLALKOZAS)|
+------------+---------------------------+
|   1976.5625|                          5|
+------------+---------------------------+


Ha jobban kedveljük a programozott megoldást, akkor ugyan úgy elérhető művelet minden SQL DQL utasításra.

Ez például egy másik mód az új oszlop felvételére. Az `expr()` függvény paramétereként megadott utasítás SQL-ként fog értelmezésre kerülni.

In [17]:
df.withColumn('KERESET', expr('FIZETES + JUTALEK')).show(4)

+----+-----+-----------+------+---------+-------+-------+-----+-------+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|KERESET|
+----+-----+-----------+------+---------+-------+-------+-----+-------+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698|BLAKE|    MANAGER|  NULL|81-MAY-01|   2850|      0|   30|   2850|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   2450|
|7566|JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   2975|
+----+-----+-----------+------+---------+-------+-------+-----+-------+


Ezek a megoldások szinte mindig ekvivalens végrehajtási terveket eredményeznek. Két DataFrame lekérdezés ekvivalens ha a végrehajtási terveik megegyeznek, amit a `sameSemantics(dataframe)` metódussal tudunk ellenőrizni.

In [18]:
df.withColumn('KERESET', df.FIZETES + df.JUTALEK).sameSemantics(
    df.withColumn('KERESET', expr('FIZETES + JUTALEK'))
)

True

Magát a lekérdezési tervet is le tudjuk kérni:

In [19]:
df.withColumn('KERESET', df.FIZETES + df.JUTALEK).explain()

== Physical Plan ==
*(1) Project [DKOD#183, DNEV#184, FOGLALKOZAS#185, FONOKE#186, BELEPES#187, FIZETES#188, JUTALEK#189, OAZON#190, (FIZETES#188 + JUTALEK#189) AS KERESET#459]
+- FileScan csv [DKOD#183,DNEV#184,FOGLALKOZAS#185,FONOKE#186,BELEPES#187,FIZETES#188,JUTALEK#189,OAZON#190] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/bb200/Documents/elte-ik-bsc/5/bigdata/spark/07/dolgozo...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DKOD:int,DNEV:string,FOGLALKOZAS:string,FONOKE:int,BELEPES:string,FIZETES:int,JUTALEK:int,...


Oszlopátnevezés:

In [20]:
df.withColumnRenamed('FIZETES', 'HAVI_FIZETES').show(5)

+----+------+-----------+------+---------+------------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|HAVI_FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+------------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|        5000|      0|   10|
|7698| BLAKE|    MANAGER|  NULL|81-MAY-01|        2850|      0|   30|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|        2450|      0|   10|
|7566| JONES|    MANAGER|  7839|81-APR-02|        2975|      0|   20|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|        1250|   1400|   30|
+----+------+-----------+------+---------+------------+-------+-----+


A `drop()` metódussal lehetőségünk van oszlopokat elhagyni.

In [21]:
df.drop(df.OAZON).show(4)

+----+-----+-----------+------+---------+-------+-------+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|
+----+-----+-----------+------+---------+-------+-------+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|
|7566|JONES|    MANAGER|  7839|81-APR-02|   2975|      0|
+----+-----+-----------+------+---------+-------+-------+


Természetesen rendelkezésre áll a szűrés művelete is:

In [22]:
df.where(df.FIZETES > 3000).select(df.DNEV).show(5)

+----+
|DNEV|
+----+
|KING|
+----+


A `where()` valójában a `filter()`-t hívja meg. Feltételeket az `&` (és), `|` (vagy), `~` (nem) logikai műveletekkel tudjuk összekapcsolni.

In [27]:
df.filter((df.FIZETES > 2000) & df.OAZON != 20).show(4)

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
+----+-----+-----------+------+---------+-------+-------+-----+


Ha indokolt ajánlott a logikai összekötést használni, mert a `where()` láncolás más végrehasjtási tervet eredményez.

In [41]:
df.where(df.FIZETES > 2000).where(df.OAZON != 20).sameSemantics(df.where((df.FIZETES > 2000) & (df.OAZON != 20)))

False

A dataframe ismétlődéseit `distinct()` metódussal tudjuk megszüntetni. A `count()`-al pedig le tudjuk kérni a dataframe sorainak számát.

In [None]:
df.crosstab()

In [31]:
df.select(df.FOGLALKOZAS).distinct().count()

6

A dataframe rendezhető. Ez, hogy az adott oszlop szerint növekvő vagy csökkenő sorrend legyen az `asc()` és `desc()` hívásokkal specifikálható. A `limit()` metódussal eldobhatók a dataframe első `n` soron kívüli sorai.

A `limit` és a `take`/`show` nem ekvivalensek, mivel egyedül a limit ad vissza új dataframe-et. 

In [33]:
df.orderBy(df.OAZON.asc(), df.FIZETES.desc()).limit(5).show()

+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7934|MILLER|      CLERK|  7782|82-JAN-23|   1300|      0|   10|
|7877|  LOLA|      CLERK|  7902|81-JAN-12|    800|      0|   10|
|7788| SCOTT|    ANALYST|  7566|82-DEC-09|   3000|      0|   20|
+----+------+-----------+------+---------+-------+-------+-----+


Csoportosításra is lehetőségünk van.

In [44]:
df.groupBy(df.OAZON, df.FOGLALKOZAS).count().show()

+-----+-----------+-----+
|OAZON|FOGLALKOZAS|count|
+-----+-----------+-----+
|   20|    ANALYST|    2|
|   20|       NULL|    1|
|   20|    MANAGER|    1|
|   30|    MANAGER|    1|
|   30|   SALESMAN|    4|
|   30|      CLERK|    1|
|   10|  PRESIDENT|    1|
|   20|      CLERK|    2|
|   10|      CLERK|    2|
|   10|    MANAGER|    1|
+-----+-----------+-----+


A csoportokat összegezhetjük is. Ehhez az SQL-ben megismert összegző függvények is adottak (`avg`, `max`, `min`, `sum`, `count`).

In [34]:
df.groupBy(df.OAZON).agg(sum(df.FIZETES)).alias('OSSZ').show()

+-----+------------+
|OAZON|sum(FIZETES)|
+-----+------------+
|   20|       12675|
|   10|        9550|
|   30|        9400|
+-----+------------+


Illetve lehetőségünk van összekapcsolni dataframe-eket.

In [35]:
osztaly_df = spark.read.option('header', True).option('inferSchema', True).csv('osztaly.csv')

In [36]:
osztaly_df.show()

+-----+----------+---------+
|OAZON|       NEV|TELEPHELY|
+-----+----------+---------+
|   10|ACCOUNTING| NEW YORK|
|   20|  RESEARCH|   DALLAS|
|   30|     SALES|  CHICAGO|
|   40|OPERATIONS|   BOSTON|
+-----+----------+---------+


Az összekapcsolási feltétel a második paraméterként feltételek tömbjeként adandó meg. Egy feltétel esetén a tömb elhagyható.
Harmadik paraméter az összekapcsolás típusa.

In [37]:
df.join(osztaly_df, df.OAZON == osztaly_df.OAZON, 'inner').show()

+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|OAZON|       NEV|TELEPHELY|
+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   10|ACCOUNTING| NEW YORK|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   30|     SALES|  CHICAGO|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   10|ACCOUNTING| NEW YORK|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   20|  RESEARCH|   DALLAS|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   30|     SALES|  CHICAGO|
|7499| ALLEN|   SALESMAN|  7698|81-FEB-20|   1600|    300|   30|   30|     SALES|  CHICAGO|
|7844|TURNER|   SALESMAN|  7698|81-SEP-08|   1500|      0|   30|   30|     SALES|  CHICAGO|
|7900| JAMES|      CLERK|  7698|81-DEC-03|    950|      0|   30|   30|     SALES

A természetes összekapcsoláshoz elég csak az oszlop nevét megadni.

In [38]:
df.join(osztaly_df, 'OAZON').show()

+-----+----+------+-----------+------+---------+-------+-------+----------+---------+
|OAZON|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|       NEV|TELEPHELY|
+-----+----+------+-----------+------+---------+-------+-------+----------+---------+
|   10|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|ACCOUNTING| NEW YORK|
|   30|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|     SALES|  CHICAGO|
|   10|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|ACCOUNTING| NEW YORK|
|   20|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|  RESEARCH|   DALLAS|
|   30|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|     SALES|  CHICAGO|
|   30|7499| ALLEN|   SALESMAN|  7698|81-FEB-20|   1600|    300|     SALES|  CHICAGO|
|   30|7844|TURNER|   SALESMAN|  7698|81-SEP-08|   1500|      0|     SALES|  CHICAGO|
|   30|7900| JAMES|      CLERK|  7698|81-DEC-03|    950|      0|     SALES|  CHICAGO|
|   30|7521|  WARD|   SALESMAN|  7698|81-FEB-22|   125