In [None]:
print("pierwsze polecenie uruchomi context sparka")

In [None]:
# Ładujemy dodatkowe funckje z biblioteki Spark z modułu SQL
from pyspark.sql.functions import *

### Przygotwanie danych do ćwiczenia

In [None]:
# Wpisujemy nazwę bazy danych, którą utworzyliśmy w Hive
nazwa_mojej_bazy_danych_w_hive='<wprowadz_swoja_nazwe_bazy_w_Hive>'

In [None]:
# Wybieramy bazę danych dla Sparka w ramach, której będziemy pracować
spark.sql('use {db}'.format(db=nazwa_mojej_bazy_danych_w_hive))
print("Uzywana baza danych: {db}".format(db=nazwa_mojej_bazy_danych_w_hive))

In [None]:
# Przygotowujemy mapowanie do obiektu, który będzie korzystał z tabeli w Hive
posts = spark.table('posts')

In [None]:
# Przygotowujemy obiekt na bazie posts, który:
#   - będzie zawierał dodatkową kolumnę year - wykorzystamy ją do partycjonowania danych
#   - wybieramy również inne kolumny, które nas interesują
posts_with_year = posts \
    .withColumn('year', col('creationdate').substr(1, 4)) \
    .select('year','id','posttypeid','viewcount','body','title','owneruserid','answercount')

In [None]:
# Sprawdzamy jak wygląda nowo przygotowana struktura
posts_with_year.printSchema()

In [None]:
# Pobieramy 10 rekordów aby sprawdzić czy dane zgadzają się z oczekiwaniami
posts_with_year.show(10)

In [None]:
# Sprawdzamy, ile lat mamy w naszym zbiorze - za chwilę będzie to nasz klucz partycjonowania
posts_with_year.select('year').distinct().orderBy('year').show()

### Zapis danych do bucketu w S3


In [None]:
# Przygotowujemy nazwe katalogu do którego zapiszemy dane na S3
# Bucket jest wspólny dla wszystkich dlatego tworzymy dedykowana ścieżkę dla każdego z uczestników szkolenia
path_on_s3_to_write_data = "s3://kompleksowe-szkolenie-bigdata-write-exercise-v2/{db}/posts_with_year_partitions" \
    .format(db=nazwa_mojej_bazy_danych_w_hive)

In [None]:
print("Sciezka do ktorej zapiszemy dane na S3: {path}".format(path=path_on_s3_to_write_data))

In [None]:
# Wykonujemy operację "write" w trybie "overwrite", partycjonując nasze dane przy zapisie za pomocą kolumny "year"
# do nazwy katalogu przechowywanego w zmiennej "path_on_s3_to_write_data"
# Dane zapisujemy używając formatu Parquet
posts_with_year \
    .write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet(path_on_s3_to_write_data)

### Odczyt danych z bucketu S3 w Sparku

In [None]:
# Do nowego obiektu mapujemy ściężkę porzednio wykorzystywaną, aby zweryfikować czy nasze dane zostały poprawnie zapisane
posts_with_year_s3_read = spark.read.parquet(path_on_s3_to_write_data)

In [None]:
# Sprawdzamy czy Spark poprawnie odczytał schemat danych - schemat danych jest zapisywany w plikach Parquet
# Spark ma mechanizm automatycznego inferowania schemy z danych, które zna - w tym przypadku Parquet
posts_with_year_s3_read.printSchema()

In [None]:
# Sprawdzamy czy wszystkie lata, których oczekujemy, znajdują się w zbiorze
posts_with_year_s3_read.select('year').distinct().orderBy('year').show()

In [None]:
# Sprawdzmy, czy dane poprawnie wyglądają
posts_with_year_s3_read.filter(col('year')==2014).show(20)

### Odtwarzanie tabeli w Hive na podstawie danych z S3

Ten przykład można wykonać zarówno z poziomu Sparka jak i Hive - polecenia działają w obu środowiskach

Tworzymy tabelę typu external oraz odtwarzamy w niej strukturę partycji na podstawie tego co zostało zapisane w S3

**UWAGA!** W poniższym poleceniu musisz zmodyfikować "location" dla tabeli - ścieżka, którą należy podać została wygenerowana wcześniej - zmienna "path_on_s3_to_write_data" w kroku zapisywania danych do S3

In [None]:
%%sql
CREATE EXTERNAL TABLE posts_with_year
(
    id long,
    posttypeid int,
    viewcount long,
    body string,
    title string,
    owneruserid long,
    answercount long
)
partitioned by (year integer)
stored as parquet
location '<wproadz_sceizke_zapisu_do_S3>'

In [None]:
%%sql
show partitions posts_with_year

In [None]:
%%sql
alter table posts_with_year recover partitions;

In [None]:
%%sql
show partitions posts_with_year

In [None]:
%%sql
select  * from posts_with_year where year = 2014 
limit 20