In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=1b608722be15e0c7a6b85ad48a82341481f6a4155b6580f61b75214346fde26c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# Új belépési pont: SparkSession

from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [3]:
# Adathalmaz betöltése csv fájlból

df = spark.read\
.format("csv")\
.load("/content/drive/MyDrive/Colab Notebooks/1-2-3/dolgozo.csv")

In [5]:
# A show() függvény megmutatja a DataFrame tartalmát, táblázatos formában
# Sajnos automatikusan nem ismerte fel a fejlécet

df.show(5)

+----+-----+-----------+------+---------+-------+-------+-----+
| _c0|  _c1|        _c2|   _c3|      _c4|    _c5|    _c6|  _c7|
+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
|7839| KING|  PRESIDENT|  0000|81-NOV-17|   5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566|JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
+----+-----+-----------+------+---------+-------+-------+-----+
only showing top 5 rows



In [6]:
# Az adathalmaz sémája
# A séma határozza meg azt, hogy milyen nevű és típusú oszlopai vannak az adatunknak

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [7]:
# Belolvasás közben megadhatjuk, hogy az első sor fejlécként kerüljön beovlasásra
# Az inferschema segítségével automatikusan beállítódnak a típusok

df = spark.read.format("csv")\
.option('header', True)\
.option("inferschema", True)\
.load("/content/drive/MyDrive/Colab Notebooks/1-2-3/dolgozo.csv")

#df.show(5)
df.printSchema()

root
 |-- DKOD: integer (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: integer (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: integer (nullable = true)
 |-- JUTALEK: integer (nullable = true)
 |-- OAZON: integer (nullable = true)



In [8]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

dolgozoSchema = StructType([\
    StructField("DKOD",IntegerType()), \
    StructField("DNEV",StringType()), \
    StructField("FOGLALKOZAS",StringType()), \
    StructField("FONOKE",IntegerType()), \
    StructField("BELEPES",StringType()), \
    StructField("FIZETES",IntegerType()), \
    StructField("JUTALEK",IntegerType()), \
    StructField("OAZON",IntegerType())
  ])

df = spark.read.format("csv")\
.schema(dolgozoSchema)\
.option('header', True)\
.option('enforceSchema', True)\
.load("/content/drive/MyDrive/Colab Notebooks/1-2-3/dolgozo.csv")

df.printSchema()

root
 |-- DKOD: integer (nullable = true)
 |-- DNEV: string (nullable = true)
 |-- FOGLALKOZAS: string (nullable = true)
 |-- FONOKE: integer (nullable = true)
 |-- BELEPES: string (nullable = true)
 |-- FIZETES: integer (nullable = true)
 |-- JUTALEK: integer (nullable = true)
 |-- OAZON: integer (nullable = true)



In [9]:
# Ki tudjuk listázni a DataFrame-ünk oszlopait

df.columns

['DKOD',
 'DNEV',
 'FOGLALKOZAS',
 'FONOKE',
 'BELEPES',
 'FIZETES',
 'JUTALEK',
 'OAZON']

In [10]:
# DataFrame-et nem csak beolvasni lehet, hanem manuálisan létrehozni is
# Egyszerű példa:

dSchema = StructType([\
    StructField("DKOD",IntegerType(),True), \
    StructField("DNEV",StringType(),True), \
    StructField("FIZETES",IntegerType(),True)
  ])

rows = [(1111, 'MR.BLACK', 1000),
       (2222, 'MR.WHITE', 2000),
       (3333, 'MR.PINK', 3000)]

dolgozoRDD = spark.sparkContext.parallelize(rows)
dolgozoDF = spark.createDataFrame(dolgozoRDD, dSchema)
dolgozoDF.show()

+----+--------+-------+
|DKOD|    DNEV|FIZETES|
+----+--------+-------+
|1111|MR.BLACK|   1000|
|2222|MR.WHITE|   2000|
|3333| MR.PINK|   3000|
+----+--------+-------+



In [11]:
# Oszlopok kiválasztása

df.select('DNEV').show(2)
df.select('DNEV', 'FIZETES').show(2)

+-----+
| DNEV|
+-----+
| KING|
|BLAKE|
+-----+
only showing top 2 rows

+-----+-------+
| DNEV|FIZETES|
+-----+-------+
| KING|   5000|
|BLAKE|   2850|
+-----+-------+
only showing top 2 rows



In [12]:
# Kifejezések a select-ben (új oszlop létrheozása kifejezés alapján)

df.selectExpr(
"*",  #az osszes kereset elotti reszt jelenti
"(FIZETES + JUTALEK) as kereset")\
.show(5)

+----+------+-----------+------+---------+-------+-------+-----+-------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|kereset|
+----+------+-----------+------+---------+-------+-------+-----+-------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   2850|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   2450|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   2975|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   2650|
+----+------+-----------+------+---------+-------+-------+-----+-------+
only showing top 5 rows



In [13]:
# Aggregáció a select-ben

df.selectExpr("avg(FIZETES)", "count(distinct(OAZON))").show()

+------------+---------------------+
|avg(FIZETES)|count(DISTINCT OAZON)|
+------------+---------------------+
|   1976.5625|                    3|
+------------+---------------------+



In [14]:
# Új oszlop létrehozása és oszlop átnevezése

from pyspark.sql.functions import expr

df.withColumn("kereset", expr("FIZETES + JUTALEK")).show(2) # 'kereset' oszloppal valo kiegeszites

df.withColumnRenamed("FIZETES", 'HETI_FIZETES').show(2)  # 'fizetes' oszlop atnevezese

# Oszlop törlése
df.drop('FONOKE').show(2)

+----+-----+-----------+------+---------+-------+-------+-----+-------+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|kereset|
+----+-----+-----------+------+---------+-------+-------+-----+-------+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   5000|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   2850|
+----+-----+-----------+------+---------+-------+-------+-----+-------+
only showing top 2 rows

+----+-----+-----------+------+---------+------------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|HETI_FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+------------+-------+-----+
|7839| KING|  PRESIDENT|     0|81-NOV-17|        5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|        2850|      0|   30|
+----+-----+-----------+------+---------+------------+-------+-----+
only showing top 2 rows

+----+-----+-----------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|  BELEPES|FIZETES|JUTAL

In [15]:
# Sorok szűrése
from pyspark.sql.functions import col

# Filter kulcsszó használatával:
# df.filter(col('DEST') == 'United States').show(4)

# Where kulcsszó használatával (mindkettő ugyanazt csinálja):
df.where(col('OAZON') == 20).where(col('FIZETES') >= 3000).show()

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7902| FORD|    ANALYST|  7566|81-DEC-03|   3000|      0|   20|
|7788|SCOTT|    ANALYST|  7566|82-DEC-09|   3000|      0|   20|
+----+-----+-----------+------+---------+-------+-------+-----+



In [16]:
# Példa a VAGY használatára (ugyanígy működik az & operátor is)

df.where( (col('FONOKE') == 7839) | (col('FIZETES')>3000) ).show()

+----+-----+-----------+------+---------+-------+-------+-----+
|DKOD| DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+-----+-----------+------+---------+-------+-------+-----+
|7839| KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7698|BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|
|7782|CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7566|JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|
+----+-----+-----------+------+---------+-------+-------+-----+



In [17]:
# További műveletek

# Duplikátumok eltávolítása
df.select(col('OAZON')).distinct().count()

3

In [18]:
df.orderBy(col('OAZON'), col('FIZETES').desc()).show(5)


+----+------+-----------+------+---------+-------+-------+-----+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|
+----+------+-----------+------+---------+-------+-------+-----+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|
|7934|MILLER|      CLERK|  7782|82-JAN-23|   1300|      0|   10|
|7877|  LOLA|      CLERK|  7902|81-JAN-12|    800|      0|   10|
|7902|  FORD|    ANALYST|  7566|81-DEC-03|   3000|      0|   20|
+----+------+-----------+------+---------+-------+-------+-----+
only showing top 5 rows



In [21]:
# Csoportosítás
df.groupBy('OAZON').count().show()

+-----+-----+
|OAZON|count|
+-----+-----+
|   20|    6|
|   10|    4|
|   30|    6|
+-----+-----+



In [22]:
# Aggregáció csoportosítás után az agg() függvénnyel
# Az agg() függvénybe más aggregációs függvény is kerülhet: count, sum, min, max stb.

df.groupBy('OAZON').agg( countDistinct('DNEV'), sum('FIZETES'),max('JUTALEK') ).show()

+-----+--------------------+------------+------------+
|OAZON|count(DISTINCT DNEV)|sum(FIZETES)|max(JUTALEK)|
+-----+--------------------+------------+------------+
|   20|                   6|       12675|         300|
|   10|                   4|        9550|           0|
|   30|                   6|        9400|        1400|
+-----+--------------------+------------+------------+



In [23]:
# Összekapcsolás
# Összekapcsoláshoz olvassunk be egy másik DataFrame-et
# df.join(df2, join_expression, join_type)

osztaly = spark.read.format("csv")\
.option('header', True)\
.option("inferschema", True)\
.load("/content/drive/MyDrive/Colab Notebooks/1-2-3/osztaly.csv")

osztaly.show()

+-----+----------+---------+
|OAZON|       NEV|TELEPHELY|
+-----+----------+---------+
|   10|ACCOUNTING| NEW YORK|
|   20|  RESEARCH|   DALLAS|
|   30|     SALES|  CHICAGO|
|   40|OPERATIONS|   BOSTON|
+-----+----------+---------+



In [24]:
# Összekapcsolás
# df.join(df2, join_expression, join_type)

df.alias('D')\
.join(osztaly.alias('O'), col('D.OAZON') == col('O.OAZON'), 'inner')\
.show(5)

+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|DKOD|  DNEV|FOGLALKOZAS|FONOKE|  BELEPES|FIZETES|JUTALEK|OAZON|OAZON|       NEV|TELEPHELY|
+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
|7839|  KING|  PRESIDENT|     0|81-NOV-17|   5000|      0|   10|   10|ACCOUNTING| NEW YORK|
|7698| BLAKE|    MANAGER|  7839|81-MAY-01|   2850|      0|   30|   30|     SALES|  CHICAGO|
|7782| CLARK|    MANAGER|  7839|81-JUN-09|   2450|      0|   10|   10|ACCOUNTING| NEW YORK|
|7566| JONES|    MANAGER|  7839|81-APR-02|   2975|      0|   20|   20|  RESEARCH|   DALLAS|
|7654|MARTIN|   SALESMAN|  7698|81-SEP-28|   1250|   1400|   30|   30|     SALES|  CHICAGO|
+----+------+-----------+------+---------+-------+-------+-----+-----+----------+---------+
only showing top 5 rows



In [25]:
# Tábla létrehozás a DataFrame alapján - ezután lehetőségünk van a tábla nevére hivatkozni SQL lekérdezéseknél

df.createOrReplaceTempView("dolgozoTable")

In [26]:
# SQL lekérdezés írása

spark.sql('SELECT DNEV FROM dolgozoTable').show(5)

+------+
|  DNEV|
+------+
|  KING|
| BLAKE|
| CLARK|
| JONES|
|MARTIN|
+------+
only showing top 5 rows



In [27]:
# Komplexebb lekérdezés is írható

spark.sql('''
SELECT OAZON, count(*)
FROM dolgozoTable
GROUP BY OAZON
''').show()

+-----+--------+
|OAZON|count(1)|
+-----+--------+
|   20|       6|
|   10|       4|
|   30|       6|
+-----+--------+



In [28]:
# Végrehajtási tervek

spark.sql('SELECT OAZON, count(*) FROM dolgozoTable GROUP BY OAZON').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[OAZON#115], functions=[count(1)])
   +- Exchange hashpartitioning(OAZON#115, 200), ENSURE_REQUIREMENTS, [plan_id=674]
      +- HashAggregate(keys=[OAZON#115], functions=[partial_count(1)])
         +- FileScan csv [OAZON#115] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/1-2-3/dolgozo.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<OAZON:int>




In [29]:
# Ugyanaz a végrehajtási terv a Sparkos megoldás és az SQL-es megoldás során is

df.groupBy('OAZON').count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[OAZON#115], functions=[count(1)])
   +- Exchange hashpartitioning(OAZON#115, 200), ENSURE_REQUIREMENTS, [plan_id=687]
      +- HashAggregate(keys=[OAZON#115], functions=[partial_count(1)])
         +- FileScan csv [OAZON#115] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/1-2-3/dolgozo.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<OAZON:int>




In [31]:
# Retail adatok beolvasása

retail = spark.read.format("csv")\
.option('header', True)\
.option("inferschema", True)\
.load("/content/drive/MyDrive/Colab Notebooks/1-2-3/online_retail_data.csv")

retail.createOrReplaceTempView("retail_table")
retail.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [32]:
# Kiszámíthatunk alapvető statisztikákat a DataFrame oszlopaira

retail.select('UnitPrice', 'Quantity').summary().show()

+-------+-----------------+------------------+
|summary|        UnitPrice|          Quantity|
+-------+-----------------+------------------+
|  count|           542014|            542014|
|   mean|4.611339197874949| 9.555284549845576|
| stddev| 96.7509795804258|218.06048544770684|
|    min|        -11062.06|            -80995|
|    25%|             1.25|                 1|
|    50%|             2.08|                 3|
|    75%|             4.13|                10|
|    max|          38970.0|             80995|
+-------+-----------------+------------------+



In [35]:
# Melyik országban él a legtöbb vásárló? (Spark)

retail\
.groupBy("Country")\
.agg(countDistinct("CustomerID").alias("cnt"))\
.orderBy(col("cnt").desc())\
.limit(5)\
.show()

+--------------+----+
|       Country| cnt|
+--------------+----+
|United Kingdom|3950|
|       Germany|  95|
|        France|  87|
|         Spain|  31|
|       Belgium|  25|
+--------------+----+



In [36]:
# Melyik országban él a legtöbb vásárló? (SQL)

spark.sql('''
select Country, count(distinct CustomerID) as cnt from retail_table
group by Country
order by cnt desc LIMIT 1
''').show()

+--------------+----+
|       Country| cnt|
+--------------+----+
|United Kingdom|3950|
+--------------+----+



In [37]:
# Adjuk meg azt az 5 országot, amelyekből a legtöbb bevétel származik (Spark)

retail.selectExpr('*', '(Quantity * UnitPrice) as bevetel')\
.groupBy('Country')\
.sum('bevetel')\
.orderBy(col('sum(bevetel)').desc() )\
.show(5)

+--------------+------------------+
|       Country|      sum(bevetel)|
+--------------+------------------+
|United Kingdom| 8208343.203998724|
|   Netherlands| 284661.5399999992|
|          EIRE| 263276.8199999992|
|       Germany|221698.21000000037|
|        France| 197463.5900000004|
+--------------+------------------+
only showing top 5 rows



In [38]:
# Adjuk meg azt az 5 országot, amelyekből a legtöbb bevétel származik (SQL)

spark.sql('''
select Country, sum(Quantity * UnitPrice)
from retail_table
group by Country
order by sum(Quantity * UnitPrice) desc
''').show(5)

+--------------+---------------------------+
|       Country|sum((Quantity * UnitPrice))|
+--------------+---------------------------+
|United Kingdom|          8208343.203998724|
|   Netherlands|          284661.5399999992|
|          EIRE|          263276.8199999992|
|       Germany|         221698.21000000037|
|        France|          197463.5900000004|
+--------------+---------------------------+
only showing top 5 rows



In [39]:
# Melyik termék termelte a legnagyobb bevételt? (SQL)

spark.sql('''
select StockCode, Description, sum(Quantity * UnitPrice) from retail_table
group by StockCode, Description
order by sum(Quantity * UnitPrice) desc
''').show(1)

+---------+--------------+---------------------------+
|StockCode|   Description|sum((Quantity * UnitPrice))|
+---------+--------------+---------------------------+
|      DOT|DOTCOM POSTAGE|         206245.48000000004|
+---------+--------------+---------------------------+
only showing top 1 row



In [40]:
# Melyik a legnépszerűbb termék (amelyikből a legtöbb darabot eladták)? (Spark)

retail\
.groupBy("StockCode")\
.agg(sum(col("Quantity")).alias("db"))\
.orderBy(desc("db"))\
.limit(1)\
.show()

+---------+-----+
|StockCode|   db|
+---------+-----+
|    22197|56502|
+---------+-----+



In [41]:
# Melyik a legnépszerűbb termék (amelyikből a legtöbb darabot eladták)? (SQL)

spark.sql('''
select StockCode, sum(Quantity) from retail_table
group by StockCode
order by sum(Quantity) desc
''').show(1)

+---------+-------------+
|StockCode|sum(Quantity)|
+---------+-------------+
|    22197|        56502|
+---------+-------------+
only showing top 1 row



In [42]:
# Átlagosan hány különböző terméket vesz egy vásárló egy vásárlás során? (Spark)

retail2 = retail.groupBy("InvoiceNo").agg(count(col("*")).alias("db"))
retail2.select(avg(col("db"))).show()

+------------------+
|           avg(db)|
+------------------+
|20.927181467181466|
+------------------+



In [43]:
# Átlagosan hány különböző terméket vesz egy vásárló egy vásárlás során? (SQL)

spark.sql('''
select avg(db) from
(select InvoiceNo, count(*) db from retail_table
group by InvoiceNo)
''').show(5)

+------------------+
|           avg(db)|
+------------------+
|20.927181467181466|
+------------------+



In [44]:
# Melyik tranzakció során vásárolták a legtöbb különböző terméket és hányat? (Spark)

retail\
.groupBy("InvoiceNo")\
.agg(count(col("*")).alias("cnt"))\
.orderBy(desc("cnt"))\
.limit(1)\
.show()

+---------+----+
|InvoiceNo| cnt|
+---------+----+
|   573585|1114|
+---------+----+



In [45]:
# Melyik tranzakció során vásárolták a legtöbb különböző terméket és hányat? (SQL)

spark.sql('''
select InvoiceNo, count(*) from retail_table
group by InvoiceNo
order by count(*) desc
''').show(1)

+---------+--------+
|InvoiceNo|count(1)|
+---------+--------+
|   573585|    1114|
+---------+--------+
only showing top 1 row

