In [1]:
import dask
import dask.bag as db
import dask.dataframe as dd
from datetime import datetime
import pandas as pd
import json
import os

### Task 1
Poniżej zaprezentowany kod jest podejściem sekwencyjnym do wykonania zadania przetworzenia logów (parsowanie, konwersja daty) i w takiej formie nie można zrównoleglić go tak jak zostało to zaprezentowane na przykładzie powyżej. Przekształcenie łańcucha daty na obiekt datetime wymaga najpierw wykonania parsowania pliku. Zastanów się i spróbuj przerobić to rozwiązanie tak, aby możliwe było użycie wywołań dask delayed w celu zrównoleglenia części funkcji, np. parsowanie danych w celu pobrania wartości kolumn niezależnie (tylko jednej na raz). Dane końcowe możesz zapisać do dask DataFrame, a następnie do plików parquet.

Aby zyskać jakieś porównanie między wersją sekwencyjną a zrównolegloną, dodaj odpowiednio dużo danych ze zbiorów podlinkowanych powyżej oraz zmierz czas obu rozwiązań.

In [2]:
def parse(inp: str):
    record = {}
    
    date_start = inp.find('[') + 1
    date_end = inp.find(']')
    date_s = slice(date_start, date_end)

    level_start = inp.find('[', date_end) + 1
    level_end = inp.find(']', level_start)
    level_s = slice(level_start, level_end)

    client_start = inp.find('[', level_end)
    client_end = inp.find(']', client_start)

    record["date"] = inp[date_s]    
    record["level"] = inp[level_s]
    record["client"] = "" if client_start == -1 else inp[client_start + 8: client_end]
    record["message"] = inp[client_end + 2:] if record["client"] else inp[level_end + 2:]
    
    return record

def convert_date(rec):
    rec["date"] = datetime.strptime(rec["date"], "%a %b %d %H:%M:%S %Y")

    return rec

In [3]:
bag = db.read_text('data/Apache.log').filter(lambda x: 'script not found or unable to stat' not in x).map(lambda x: x.strip())
lines = bag.compute()
lines[0:10]

['[Thu Jun 09 06:07:04 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:04 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:04 2005] [notice] suEXEC mechanism enabled (wrapper: /usr/sbin/suexec)',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: generating secret for digest authentication ...',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: done',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )']

In [4]:
start = datetime.now()

output = []
for line in lines:
    record = parse(line)
    record = convert_date(record)
    output.append(list(record.values()))
    
df = pd.DataFrame(output, columns=["date", "level", "client", "message"])

ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet('data/log_data1.parquet')

datetime.now() - start

datetime.timedelta(microseconds=955508)

In [5]:
start = datetime.now()

delayed_records = [dask.delayed(parse)(line) for line in lines]
delayed_converted = [dask.delayed(convert_date)(rec) for rec in delayed_records]

final_result = dask.compute(*delayed_converted)
df = pd.DataFrame(final_result, columns=["date", "level", "client", "message"])

ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet('data/log_data2.parquet')

datetime.now() - start

datetime.timedelta(seconds=15, microseconds=834998)

## Task 2
Wykorzystując przykłady zaprezentowane w labie wykonaj na danych people (możesz zmniejszyć lub zwiększyć ich wolumen w zależności od potrzeb) operację z użyciem Dask bag, która polegać będzie na przetworzeniu wszystkich plików i zapisaniu do plików o nazwie expired_{partition}.json rekordów, których ważność karty kredytowej wygasła (jest to wartość w formacie miesiąc/rok). Zapisując ustaw finalną liczbę plików na 10 jeżeli była inna. Możesz to zrobić poprzez zmianę ilość partycji dask bag (patrz link do API na początku laba).

In [7]:
b = dask.datasets.make_people(npartitions=100, records_per_partition=10000)
b.map(json.dumps).to_textfiles(os.path.join('data', '*.json'))

['D:/Projekty/BigDataAnalysis/lab_03/tasks/data/00.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/01.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/02.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/03.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/04.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/05.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/06.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/07.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/08.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/09.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/10.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/11.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/12.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/13.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/14.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/15.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/16.json',
 'D:/Projekty/

In [8]:
def is_card_expired(record):
    expiration = record['credit-card']['expiration-date']
    exp_month, exp_year = map(int, expiration.split('/'))
    current_year = datetime.now().year
    current_month = datetime.now().month
    exp_year += 2000
    return (exp_year < current_year) or (exp_year == current_year and exp_month < current_month)

In [9]:
people = db.read_text(os.path.join('data', '*.json')).map(json.loads)
expired_cards = people.filter(is_card_expired)

expired_cards = expired_cards.repartition(npartitions=10)
expired_cards.map(json.dumps).to_textfiles(os.path.join('data/cards', 'expired_*.json'))

['D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_0.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_1.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_2.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_3.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_4.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_5.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_6.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_7.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_8.json',
 'D:/Projekty/BigDataAnalysis/lab_03/tasks/data/cards/expired_9.json']

## Task 3
Wybierając z danych people dane tylko osób dorosłych (zaprezentowane w przykładach w tym labie) przechowaj je w obiekcie typu bag, a następnie zapisz je do dask dataframe za pomocą metody to_dataframe (pamiętaj o tym jaka jest aktualna struktura pojedynczego rekordu), a następnie zapisz do jednego pliku w formacie parquet.

In [10]:
def flatten_record(record):
    flattened = {}
    flattened['age'] = record['age']
    flattened['name'] = ' '.join(record['name'])
    flattened['occupation'] = record['occupation']
    flattened['telephone'] = record['telephone']

    flattened['address'] = record['address']['address']
    flattened['city'] = record['address']['city']

    flattened['credit_card_number'] = record['credit-card']['number']
    flattened['credit_card_expiration'] = record['credit-card']['expiration-date']
    
    return flattened

In [15]:
adults = people.filter(lambda record: record['age'] >= 18).map(flatten_record)
ddf = adults.to_dataframe()
ddf.repartition(npartitions=1).to_parquet('data/adults/adults.parquet')