# Технологии обработки больших данных
Занятие 1. Введение
1. Административная часть
2. Обработка текстовых файлов
3. Функциональная парадигма программирования

### 1. Административная часть
 Бально-рейтинговая система:  

Текущий контоль успеваемости:
* Выполнение домашних заданий - 30 баллов
* Активность на занятих - 10 баллов

Промежуточная аттестация:
* Теоретическая часть - 30 баллов
* Практическая часть - 30 баллов

За практическую часть возможно будет засчитан индивидуальный проект (если не будет курсовой работы).

### 2. Обработка текстовых файлов
Log generation  

Пусть у нас есть интернет-магазин, который посещают 10 посетителей в час. Итого за год имеем 87 600 посетителей (у wildberries - около 40 млн).  

Каждый посетитель совершает какие-то действия на сайте, которые записываются в log файл. Мы имитируем эти действия с помощью 100 случайных слов (из трех случайных букв).  

Наш лог файл получился небольшого размера и легко помещается в оперативную память одной машины. Но так бывает не всегда. Есть два пути решения проблемы.

<img src="https://github.com/balezz/learning_spark/blob/master/img/scaling.png?raw=1">

<img src="https://github.com/balezz/learning_spark/blob/master/img/threads.png?raw=1">

<img src="https://github.com/balezz/learning_spark/blob/master/img/pipeline.jpg?raw=1">

In [1]:
import random, string, datetime

In [2]:
letters = string.ascii_lowercase

LOG_SIZE = 10*24*365 # 87600

def get_random_word(length=3):
   return ''.join(random.choice(letters) for i in range(length))

def get_line(length=100):
  t = str(datetime.datetime.now())
  words = [get_random_word() for n in range(length)]
  return t + ' ' + ' '.join(w for w in words) + '\n'

lines = [get_line() for l in range(LOG_SIZE)]

with open('log.txt', 'w') as f:
  f.writelines(lines)

In [3]:
with open('log.txt') as f:
  lines = f.readlines()

print(f'Total lines - {len(lines)}. First 5:')
lines[:5]

Total lines - 87600. First 5:


['2025-03-29 09:52:16.778592 wmz juo byp hbx ncp gey rzn hwl pzv ymo ikq uuw hck axv icj kwj ziq pjh xbf ijm uim yhd zdt icb kfl tia upy mmd knu ddd she eqo tca mhb ubn zly gmj qaz pid iqf pum vve gsd xkv tqn anx iut pbi yty jem drw ywc lfr hlp xth xzt ecn bsb cqu jnw gym vii smr tdc xew yyf gxv hva hoo evr lyf lmj ewe gok zpg syn yvl ltd ybt dze pua igp zpu vdr bcz xpb bnr gud bie aky enm kus amy xwe zkm ayq enc jqe lwo vex\n',
 '2025-03-29 09:52:16.782745 avd tlh buc lyb wqj gaf ajl egx pea eol scd hua fim nkh tuz yjb kby efb jqg bsq nmi glf gbh ied exy arr fsz lri kgt dcv uyc plx oyc ibn dhs aae ltt ndl gtx olc xwa nnq hmz fvp evo fae yvi rzp zuj ojj kvy bui abn rww hab rva qre qvl vkd wcs fzb lae mev iud nbt yyx nvi flx hzb jsg pky ckj qqg cgh hku axy kkf okw stk lon axv fsl qek smp kbo tju uli cva zvh qqc rdb vup gxl fdl jqu xoy kim awj uvw wep\n',
 '2025-03-29 09:52:16.783065 uci jir nbg mfo sea duq tjy pum xud ukb xau kcj ixo xjr ljo ugd lnd dxd ceq oqv vpu vex het aad sjg jso d

Проведем простую аналитику: сколько раз встречается слово 'cat' и слово 'dog' в нашем файле.

In [4]:
%%time

cat_count = 0
dog_count = 0

for line in lines:
  if 'cat' in line:
    cat_count += 1
  if 'dog' in line:
    dog_count += 1

print(f'cat - {cat_count}, dog - {dog_count}')

cat - 487, dog - 536
CPU times: user 65.1 ms, sys: 1.98 ms, total: 67.1 ms
Wall time: 67.8 ms


Немного усложним задачу: сколько раз встречается каждое слово?

In [5]:
from collections import Counter

words = []

for line in lines:
  words += line.split(' ')[2:]

In [6]:
# 87600 * 100
len(words)

8760000

In [7]:
%%timeit
counts = Counter(words)

2.03 s ± 224 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Попробуем ускорить вычисления за счет разделения списка слов

In [8]:
%%time

LEN = len(words) // 2

words1 = words[:LEN]
words2 = words[LEN:]

counts1 = Counter(words1)
counts2 = Counter(words2)

counts3 = counts1 + counts2

CPU times: user 2.13 s, sys: 68.9 ms, total: 2.2 s
Wall time: 2.22 s


In [11]:
counts3 == counts

NameError: name 'counts' is not defined

Или с помощью параллельных процессов

In [None]:
from multiprocessing import Process, Queue, cpu_count

q = Queue()

def count(q, words):
    c = Counter(words)
    q.put(c)

c = cpu_count()
print(f'CPU count - {c}')

CPU count - 4


In [None]:
%%timeit
p1 = Process(target=count, args=(q, words1))
p1.start()

ac1 = q.get()
p1.join()

878 ms ± 21.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%%timeit
p1 = Process(target=count, args=(q, words1))
p1.start()

p2 = Process(target=count, args=(q, words2))
p2.start()

ac1 = q.get()
ac2 = q.get()

p1.join()
p2.join()

async_count = ac1 + ac2

NameError: name 'Process' is not defined

In [None]:
async_count == counts

### 3. Функциональная парадигма программирования
What is map reduce?  

Greeting programming language paradigm:
1. Imperative:
* Procedural (Fortran, Pascal, C ...)
* Object Oriented (Smalltalk, C++, Java ...)
* Parallel Processing (Ada, Occam, cuda ...)
2. Declarative:
* Logic (Prolog ...)
* Functional (Lisp, Scheme ...)
* Database (SQL ...)

Большинство современных языков поддерживают несколько парадигм программирования.

#### High-order functions (Функции высшего порядка)
#### map

In [12]:
data = list(range(5))
data

[0, 1, 2, 3, 4]

In [15]:
%%time
def my_cube(x):
  return x*x*x

my_map = map(my_cube, data)

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 10.5 µs


In [None]:
%%time
my_list = list(my_map)

CPU times: user 31.6 ms, sys: 8.09 ms, total: 39.7 ms
Wall time: 51.8 ms


In [None]:
%%timeit
my_list = []
for d in data:
    my_list.append(my_cube(d))

13.2 ms ± 503 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [None]:
%%timeit
list(map(lambda x: x*x*x, data))

8.79 ms ± 108 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


#### filter

In [20]:
list(filter(lambda x: x%2==0, data))

[0, 2, 4]

#### reduce

In [None]:
data

[0, 1, 2, 3, 4]

In [None]:
import functools as f

def my_sum(a, b):
  return a + b

f.reduce(my_sum, data)

10

In [None]:
f.reduce(lambda a, b: a + b, data)

10

# Домашнее задание

#### 1. Переведите все слова в лог-файле в uppercase, в решении используйте map

In [22]:
# expected
log_list = """['2022-02-12 14:07:14.497548 ERK QJF REP DQL ZSC SHL BNB AAP OAU XML BKU PUD OYU LRZ KUF LYC XKH WVQ UVE LEW RVN HOD ZFJ SHF PAS YFC VTZ JPA GHU TJD HSS MPS NEO EYJ GNY VDD XLD XIK UMZ KBN WNE GYW MBX LNI TTA QCN BHF KUZ WCQ QBJ SHI YHR DLL MVH SPV BTG MTJ XLN AIY HKA BQO ZZH BBI XRH NEP CNK KUV RNE FNU SCZ KFV EHM WYI EWE OAP UWF IVJ HFL ODJ SOE RTE FXE OLY TOG VUX KYP RUK MCQ COO TBD XBI CUG TLB GJY GKQ TEO UZF SIR RGK KMP\n',
 '2022-02-12 14:07:14.498591 VIS BRA CHC GAM FYT VCP DWO VHT NEM LAA CXS YRC KOY ZTJ TZC MJI XPJ QBA XMT PJX QBE POY EUC MLH FPU CNX PSG QLN QMF APS URL GZZ CSU FYY IME LJG GGI LJA EDD TVG SJM JIZ EQM LRK GUN AQZ IGQ SBA LZU HUJ EQM HZJ KKJ JHE GJW ELG RXA RLH VBL YMR LDJ ISY VJX GVV UFX NYY LHG QYV XMT JET KRJ MNJ JKH MJL WVT RVW MOJ OES QXA ZJW DQV VNE ZEP UGV GXZ YCB FPS BZF ENH WSH PJD WED CLX WBN QGS BOP QFF ZWK CRQ XJR\n',
 '2022-02-12 14:07:14.499505 WXG DLB AKS IXN JEK QGD TUY KSU HDF CEY JEQ GLU MAW TEW YZO AVC IBD YSS HYL YBO DCF PNT UTB MZW ZVJ EYN YYW FLM TIL WYG WJR ETO XMD KRL OOE MEN HED QRJ JZC SFT RXN JNC PEG CWV TKO XRU YEP AQW ZZV VNU PYU THS PML ZEG XIK RYF DRS VFQ GLB FDZ SCT KEP GXP NKL XTE TRF LIX CDB ARM NBT GCV BNU AFX PPG PRV DYX AJN ZDJ QTU IRB QLI LNG RJR LFN KBD JTV IPX OIV YUS DZR MXG QHM CTI LLM IUT ZCK RCT QPN LQS OYO\n',
 '2022-02-12 14:07:14.500389 WOY WFE LUU BWK YBR FNU BRY CFR YWS TVA ABR TJI MWU THK NMJ HLB NFG ICR ITF DAY ZFB ZZI QHA VLA KZW WEW QYY IGI XLW VWB RYI HDH CTN MQE OYI ICD IEC CGL OIL GYR ZYI SGW DWU KQB CTR YIZ XHT EVM OOA UAF JUO NBK XMD VOM ZTQ GLO FTZ BPZ XNP FNH IQJ DAO CYN CDE MXS CEB VUV GEC BZI SRF XYH PWE UEZ QVA FKN WKA TJM VUI HSI EGS UTL WBT TUG JYC KOH NLO AYF QAZ FYM AFA WPH WKV DMA JDP NDW ROF SUQ QOK QZP PFZ\n',
 '2022-02-12 14:07:14.501292 ZWL QPO HOC WLB HRM DOM DIQ CKZ INA HBY DQR OMQ UWN NCW YIY GYB ZZZ HIC CHM LUY CIX MYD PPH NEK VUP IFH VRW XSX RMI RFH QLW WBT LDQ BXN YMA HXX INQ YZS NZI LCN AGU GNW MGB XYP WTM QMI QJC UOU RAC UAR NPD ELS CIN OMN QRJ KLU AIZ OVL GLF VIB QHF SRX CLH DGJ DQY CXK OQO IRC TKK PFZ PVE FBN NQT NDQ VMC ZGT KBI SOD QBB MQR LSL YMM MIQ HZF MGQ ZLM SZO YOL HWC WIY RAI NKK VAL VWM FPW PHL TZS DVU WKQ DNJ\n']"""

In [27]:
uppercase_string = " ".join(map(str.upper, log_list.split()))

print(uppercase_string)

['2022-02-12 14:07:14.497548 ERK QJF REP DQL ZSC SHL BNB AAP OAU XML BKU PUD OYU LRZ KUF LYC XKH WVQ UVE LEW RVN HOD ZFJ SHF PAS YFC VTZ JPA GHU TJD HSS MPS NEO EYJ GNY VDD XLD XIK UMZ KBN WNE GYW MBX LNI TTA QCN BHF KUZ WCQ QBJ SHI YHR DLL MVH SPV BTG MTJ XLN AIY HKA BQO ZZH BBI XRH NEP CNK KUV RNE FNU SCZ KFV EHM WYI EWE OAP UWF IVJ HFL ODJ SOE RTE FXE OLY TOG VUX KYP RUK MCQ COO TBD XBI CUG TLB GJY GKQ TEO UZF SIR RGK KMP ', '2022-02-12 14:07:14.498591 VIS BRA CHC GAM FYT VCP DWO VHT NEM LAA CXS YRC KOY ZTJ TZC MJI XPJ QBA XMT PJX QBE POY EUC MLH FPU CNX PSG QLN QMF APS URL GZZ CSU FYY IME LJG GGI LJA EDD TVG SJM JIZ EQM LRK GUN AQZ IGQ SBA LZU HUJ EQM HZJ KKJ JHE GJW ELG RXA RLH VBL YMR LDJ ISY VJX GVV UFX NYY LHG QYV XMT JET KRJ MNJ JKH MJL WVT RVW MOJ OES QXA ZJW DQV VNE ZEP UGV GXZ YCB FPS BZF ENH WSH PJD WED CLX WBN QGS BOP QFF ZWK CRQ XJR ', '2022-02-12 14:07:14.499505 WXG DLB AKS IXN JEK QGD TUY KSU HDF CEY JEQ GLU MAW TEW YZO AVC IBD YSS HYL YBO DCF PNT UTB MZW ZVJ EYN YYW F

#### 2. Создайте новый список с логами, в которых будут только слова из white_list.txt, в решении используйте map и filter

In [25]:
# expected
white_list = """['2022-02-1214:07:14.497548 sir',
 '2022-02-1214:07:14.498591 jet',
 '2022-02-1214:07:14.499505 peg arm',
 '2022-02-1214:07:14.500389 day tug',
 '2022-02-1214:07:14.501292 ']"""

In [26]:
filtered_logs = list(
    map(lambda line: " ".join(filter(lambda word: word in white_list, line.split())), log_list)
)

# Вывод результата
print(filtered_logs)

['[', "'", '2', '0', '2', '2', '-', '0', '2', '-', '1', '2', '', '1', '4', ':', '0', '7', ':', '1', '4', '.', '4', '9', '7', '5', '4', '8', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 

#### 3. Посчитайте количество слов из white_list в файле log.txt

In [None]:
# expected
"""Counter({'arm': 158,
         'tug': 154,
         'may': 175,
         'tan': 194,
         'cry': 184,
         'fin': 174... })"""