In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# ścieżka do bazy danych hurtowni danych oraz plików
# należy dostosować do ścieżki względnej, w której umieszczony został bieżący notebook
warehouse_location = '/opt/spark/work-dir/Lab7/metastore_db'

# utworzenie sesji Spark, ze wskazaniem włączenia obsługi Hive oraz
# lokalizacją przechowywania hurtowni danych
spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Apache SQL and Hive")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .enableHiveSupport()\
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .getOrCreate()
spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/16 18:30:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/16 18:30:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# dostosuj ścieżkę do pliku do swoich danych, tutaj został utworzony mniejszy zbiór niż w poprzednim labie
df = spark.read.csv('Lab7/employee_1m.csv', header=True, inferSchema=True)

                                                                                

In [4]:
# tworzymy widok tymczasowy w pamięci węzła
df.createOrReplaceTempView("EMPLOYEE_DATA")

In [5]:
# wypisanie tabeli, zwróć uwagę na to, czy stworzona tabela jest tymczasowa czy trwała
spark.catalog.listTables()

24/12/16 18:30:47 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/16 18:30:47 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/12/16 18:30:51 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/12/16 18:30:51 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.18.0.2
24/12/16 18:30:51 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
24/12/16 18:30:51 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


[Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [6]:
# pobranie danych jak z tabeli SQL
spark.sql("Select * from EMPLOYEE_DATA limit 4").show()
spark.sql("select firstname from EMPLOYEE_DATA").show(10)

+---+----------+----------+---+-------+
| id| firstname|  lastname|age| salary|
+---+----------+----------+---+-------+
|  1|Mieczysław|    Wlotka| 21| 8653.2|
|  2|     Marek|  Kowalski| 23|5973.43|
|  3|   Wisława|     Pysla| 38|9544.06|
|  4|Aleksandra|Malinowski| 62|8804.64|
+---+----------+----------+---+-------+

+----------+
| firstname|
+----------+
|Mieczysław|
|     Marek|
|   Wisława|
|Aleksandra|
| Krzysztof|
|  Wojciech|
|     Agata|
|Aleksandra|
|      Adam|
|     Marek|
+----------+
only showing top 10 rows



In [7]:
spark.sql("select firstname, count(firstname), avg(salary) from EMPLOYEE_DATA group by firstname").show()



+----------+----------------+------------------+
| firstname|count(firstname)|       avg(salary)|
+----------+----------------+------------------+
|   Wisława|           99846| 7851.188853233986|
|Mieczysław|           99636| 7848.163473242557|
|     Agata|          100255|  7849.10577158249|
| Krzysztof|           99593| 7851.736728886598|
|     Marek|          100260|7851.2525260323155|
|      Adam|           99888|7849.1089016698925|
| Katarzyna|           99902|  7847.58591829992|
|  Wojciech|          100379|7851.9279910140585|
|  Zbigniew|          100075| 7845.564227529331|
|Aleksandra|          100166|  7848.51743855202|
+----------+----------------+------------------+



                                                                                

In [8]:
rising = 0.1 # 10% podwyżki
spark.sql(f"select firstname, lastname, salary, round(salary + salary * {rising},2) as after_rising from EMPLOYEE_DATA").show(5)

+----------+----------+-------+------------+
| firstname|  lastname| salary|after_rising|
+----------+----------+-------+------------+
|Mieczysław|    Wlotka| 8653.2|     9518.52|
|     Marek|  Kowalski|5973.43|     6570.77|
|   Wisława|     Pysla|9544.06|    10498.47|
|Aleksandra|Malinowski|8804.64|      9685.1|
| Krzysztof|     Pysla|8252.06|     9077.27|
+----------+----------+-------+------------+
only showing top 5 rows



In [9]:
spark.catalog.currentCatalog()

'spark_catalog'

In [10]:
# dla zrealizowania kolejnych przykładów dokonamy kilku modyfikacji pliku employee
# 1. dodanie kolumny ID - indeksu
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("ID", monotonically_increasing_id())

In [11]:
df.show(10)

+---+----------+-------------------+---+-------+
| ID| firstname|           lastname|age| salary|
+---+----------+-------------------+---+-------+
|  0|Mieczysław|             Wlotka| 21| 8653.2|
|  1|     Marek|           Kowalski| 23|5973.43|
|  2|   Wisława|              Pysla| 38|9544.06|
|  3|Aleksandra|         Malinowski| 62|8804.64|
|  4| Krzysztof|              Pysla| 27|8252.06|
|  5|  Wojciech|             Wlotka| 48| 6845.1|
|  6|     Agata|Brzęczyszczykiewicz| 32|7267.86|
|  7|Aleksandra|           Kowalski| 40|8924.74|
|  8|      Adam|       Mieczykowski| 40| 7417.5|
|  9|     Marek|             Szczaw| 66|7308.44|
+---+----------+-------------------+---+-------+
only showing top 10 rows



In [12]:
# dokonamy podziału danych i zapisania w różnych formatach
splits = df.randomSplit(weights=[0.3, 0.7], seed=19)

In [13]:
splits[0].count(), splits[1].count()

                                                                                

(298931, 701069)

In [14]:
# to dość dziwne zjawisko niezbyt równego podziału danych jest opisane w artykułach:
# https://medium.com/udemy-engineering/pyspark-under-the-hood-randomsplit-and-sample-inconsistencies-examined-7c6ec62644bc
# oraz
# https://www.geeksforgeeks.org/pyspark-randomsplit-and-sample-methods/

In [15]:
# większa część trafi do nowej tymczasowej tabeli
splits[1].createOrReplaceTempView("EMPLOYEE_DATA_SPLIT_1")

In [16]:
# a mniejsza do plików JSON
splits[0].write.json('employee_data.json', mode='overwrite')

                                                                                

In [17]:
!ls ./employee_data.json/*.json

./employee_data.json/part-00000-0e75328b-13d9-4878-a8ef-ded6720a090a-c000.json
./employee_data.json/part-00001-0e75328b-13d9-4878-a8ef-ded6720a090a-c000.json


In [18]:
# aby móc wykorzystać dane w przykładach ze złączaniem, zapiszemy jeszcze próbkę danych z głównej ramki
# z identyfikatorami oraz dodatkową kolumną z podwyżką
from pyspark.sql.functions import col, lit, round

lucky_guys = spark.sql("select * from EMPLOYEE_DATA").sample(0.01)\
.withColumn('rising', lit('10%')).withColumn('salary after rising', round(col('salary') * 1.1, 2))

In [21]:
# zapisujemy szczęściarzy do oddzielnej tabeli w hurtowni
lucky_guys.write.mode('overwrite').saveAsTable("lucky_employees", format='parquet')

24/12/16 18:32:43 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/12/16 18:32:44 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/12/16 18:32:44 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/12/16 18:32:44 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [22]:
spark.catalog.listTables()

[Table(name='lucky_employees', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [23]:
!ls ./Lab7/metastore_db/lucky_employees/*.parquet

./Lab7/metastore_db/lucky_employees/part-00000-4f2163c5-a412-47fa-9bd1-2d8ad08f55f6-c000.snappy.parquet
./Lab7/metastore_db/lucky_employees/part-00001-4f2163c5-a412-47fa-9bd1-2d8ad08f55f6-c000.snappy.parquet


In [24]:
# przykład złączania danych na różnych źródłach danych
# zapytanie SQL bezpośrednio na plikach - tutaj zapisanych wcześniej JSON-ach oraz parquet
query = """
SELECT ed.ID, ed.firstname, ed.lastname, ed.salary, lucky.rising, lucky.`salary after rising`
FROM json.`./employee_data.json/` as jtable 
join EMPLOYEE_DATA ed on jtable.ID=ed.ID 
join parquet.`./Lab7/metastore_db/lucky_employees/` as lucky on ed.ID=lucky.ID
"""
df_from_json = spark.sql(query).show(10)

24/12/16 18:32:54 WARN ObjectStore: Failed to get database json, returning NoSuchObjectException
24/12/16 18:32:54 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
                                                                                

+----+----------+----------+-------+------+-------------------+
|  ID| firstname|  lastname| salary|rising|salary after rising|
+----+----------+----------+-------+------+-------------------+
| 235|   Wisława|Wróblewski|7911.75|   10%|            8702.93|
| 346| Katarzyna|     Pysla|7008.55|   10%|            7709.41|
| 688|Aleksandra|    Szczaw|6089.69|   10%|            6698.66|
| 832| Krzysztof|    Szczaw|6348.54|   10%|            6983.39|
|1206|Aleksandra|    Wlotka|7540.97|   10%|            8295.07|
|1297|Aleksandra|  Kowalski|6756.39|   10%|            7432.03|
|1320| Krzysztof|      Glut|6746.98|   10%|            7421.68|
|2857|     Marek|  Barański|7624.63|   10%|            8387.09|
|2868|      Adam|Malinowski|7646.53|   10%|            8411.18|
|3038|  Zbigniew|    Wlotka| 7472.0|   10%|             8219.2|
+----+----------+----------+-------+------+-------------------+
only showing top 10 rows



In [25]:
# ten przykład pokazuje podział na 16 wiaderek danych bazując na podziale po kolumnie ID (tu używane jest hashowanie)
# dane posortowane są w każdym buckecie po kolumnie salary
# dane zapisywane są do hurtowni Hive, a informacje o zapisanych tam tabelach przechowywane są w
# Hive metastore (domyślnie jest do baza danych Derby)
df.write.bucketBy(16, 'ID').mode('overwrite').sortBy('salary').saveAsTable('employee_id_bucketed')

                                                                                

In [28]:
!ls Lab7/metastore_db/employee_id_bucketed/*.parquet

Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00000.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00001.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00002.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00003.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00004.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00005.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00006.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00007.c000.snappy.parquet
Lab7/metastore_db/employee_id_bucketed/part-00000-010a5756-11d3-45fc-9e83-11d9b6219aa3_00008.c00

In [29]:
spark.table('employee_id_bucketed').show(10)

+------+----------+-------------------+---+-------+
|    ID| firstname|           lastname|age| salary|
+------+----------+-------------------+---+-------+
| 74556|  Zbigniew|           Barański| 54|3314.95|
| 23035|  Wojciech|       Mieczykowski| 32|3781.23|
|474578|Aleksandra|Brzęczyszczykiewicz| 41|4045.36|
|159298|      Adam|               Glut| 38| 4091.0|
|513360|  Wojciech|             Wlotka| 64|4111.58|
|372482|  Wojciech|           Barański| 24|4213.19|
|103914|  Wojciech|           Kowalski| 64|4403.96|
|381730|Aleksandra|         Malinowski| 18|4419.66|
|493673|     Agata|               Glut| 43| 4428.0|
|168018|     Marek|           Barański| 39|4439.07|
+------+----------+-------------------+---+-------+
only showing top 10 rows



In [30]:
# wypisanie tabeli
spark.catalog.listTables()

[Table(name='employee_id_bucketed', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='lucky_employees', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [32]:
# usunięcie tabeli
spark.sql('DROP TABLE employee_id_bucketed')

DataFrame[]

In [33]:
# jeżeli dane, z którymi pracujemy zawierają stosunkowo niewiele różnorodnych wartości w danych kolumnach
# lub filtrowanie i obliczenia często odbywają się na podgrupach danych to lepsze efekty uzyskamy
# poprzez wykorzystanie możliwości partycjonowania tych danych, które to partycjonowanie
# będzie również odzwierciedlone w fizycznej strukturze plików na dysku twardym w hurtowni danych

# zobaczmy przykład poniżej

df.write.partitionBy("lastname").mode('overwrite').saveAsTable("employees_partitioned_lastname")

                                                                                

In [34]:
# dobrym pomysłem jest też określenie ilości bucketów wynikających z danych w konkretnej kolumnie
# i wykorzystanie do podziału
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.bucketBy.html
buckets = spark.sql("select distinct firstname from EMPLOYEE_DATA").count()
buckets

                                                                                

10

In [36]:
# widok danych podzielonych na partycję z punktu widzenia systemu plików
!ls Lab7/metastore_db/employees_partitioned_lastname

'lastname=Barański'		'lastname=Malinowski'	 'lastname=Wlotka'
'lastname=Brzęczyszczykiewicz'	'lastname=Mieczykowski'  'lastname=Wróblewski'
'lastname=Glut'			'lastname=Pysla'	  _SUCCESS
'lastname=Kowalski'		'lastname=Szczaw'


In [37]:
df.filter(df.lastname == 'Pysla').groupby('lastname').agg({'salary': 'avg'}).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#19], functions=[avg(salary#21)])
   +- Exchange hashpartitioning(lastname#19, 200), ENSURE_REQUIREMENTS, [plan_id=693]
      +- HashAggregate(keys=[lastname#19], functions=[partial_avg(salary#21)])
         +- Filter (isnotnull(lastname#19) AND (lastname#19 = Pysla))
            +- FileScan csv [lastname#19,salary#21] Batched: false, DataFilters: [isnotnull(lastname#19), (lastname#19 = Pysla)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark/work-dir/Lab7/employee_1m.csv], PartitionFilters: [], PushedFilters: [IsNotNull(lastname), EqualTo(lastname,Pysla)], ReadSchema: struct<lastname:string,salary:double>




In [38]:
%%time
df.filter(df.lastname == 'Pysla').groupby('lastname').agg({'salary': 'avg'}).show(10)



+--------+------------------+
|lastname|       avg(salary)|
+--------+------------------+
|   Pysla|7847.1464690294915|
+--------+------------------+

CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 1.11 s


                                                                                

In [39]:
spark.sql("select lastname, avg(salary) from employees_partitioned_lastname where lastname='Pysla' group by lastname").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#507], functions=[avg(salary#506)])
   +- Exchange hashpartitioning(lastname#507, 200), ENSURE_REQUIREMENTS, [plan_id=759]
      +- HashAggregate(keys=[lastname#507], functions=[partial_avg(salary#506)])
         +- FileScan parquet spark_catalog.default.employees_partitioned_lastname[salary#506,lastname#507] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/opt/spark/work-dir/Lab7/metastore_db/employees_partitioned_lastn..., PartitionFilters: [isnotnull(lastname#507), (lastname#507 = Pysla)], PushedFilters: [], ReadSchema: struct<salary:double>




In [40]:
%%time
spark.sql("select lastname, avg(salary) from employees_partitioned_lastname where lastname='Pysla' group by lastname").show(10)

+--------+------------------+
|lastname|       avg(salary)|
+--------+------------------+
|   Pysla|7847.1464690294915|
+--------+------------------+

CPU times: user 2.95 ms, sys: 1.09 ms, total: 4.03 ms
Wall time: 318 ms


In [41]:
spark.sparkContext.stop()