In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 42.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=c546096b949b9979e437a4428ff959395bb0caf25c2d34cac8a93f3a7e7b4ffb
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
The 

In [149]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1f_9EbnywCj35EBUA32sueigxBjBJwALr'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('War and Peace by Leo Tolstoy (ru).txt')

In [4]:
import pandas as pd

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import string

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.window import Window


In [5]:
conf = SparkConf().set("spark.ui.port", "4050")

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [6]:
spark

In [7]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2020-12-19 11:19:19--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 34.196.3.7, 34.205.198.58, 3.211.100.25, ...
Connecting to bin.equinox.io (bin.equinox.io)|34.196.3.7|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13773305 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2020-12-19 11:19:19 (68.8 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13773305/13773305]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


Подсчитай кол-во слов в документе "War and Peace by Leo Tolstoy (ru).txt"

В начале имплементируем свой hadoop streaming код, для этого нам нужен маппер, отдающий строки вида "word\t1", а также редьюсер, который будет схлопывать строки с одинаковыми ключами

Все данные, чтобы не держать их в памяти можно записывать в файл, а shuffle имплементировать через sort


In [8]:
def clean_and_split(line):
  """
  Очищаем строку от лишних символов и возвращаем список слов
  """
  filter_chars = string.punctuation + string.digits + '–«»°—№…'
  line = line.lower().strip(filter_chars)
  words = line.split()
  mapped = list(map(lambda word: word.strip(filter_chars), words))
  filtered = list(filter(lambda word: word != '', mapped))
  return filtered

In [9]:
def mapper(file_name, out_file):
  """
  Реализация маппера, использует для записи файл, переданный через out_file
  """
  with open(out_file, 'w') as out:
    with open(file_name, 'r', encoding='cp1251') as f:
      for line in f:
        words = clean_and_split(line)
        for word in words:
          if word:
            out.write('{}\t1\n'.format(word))


In [10]:
# Маппим
in_file_name = 'War and Peace by Leo Tolstoy (ru).txt'
out_file = 'out.txt'
mapper(in_file_name, out_file)

In [11]:
# Имитация shuffle
!sort -o out.sorted.txt out.txt

Так как файл имеет достаточно маленький размер, все данные можно передать на один редьюсер.

In [12]:
def reducer(file_name, out_file):
  """
  Считаем количество слов, пишем в out_file
  """
  current_word = None
  current_count = 0
  word = None
  with open(out_file, 'w') as out:
    with open(file_name, 'r') as f:
      for line in f:
        line = line.strip()
        word, count = line.split('\t', 1)
        count = int(count)

        if current_word == word:
          current_count += count
        else:
          if current_word:
            out.write('{}\t{}\n'.format(current_word, current_count))
          current_count = count
          current_word = word
      if current_word == word:
        out.write('{}\t{}\n'.format(current_word, current_count))



In [13]:
# Редьюсер
file_name = 'out.sorted.txt'
out_file = 'out2.txt'
reducer(file_name, out_file)

In [14]:
# Читаем данные, которые отдал редьюсер
words_df = pd.read_csv(out_file, sep='\t', names=['word', 'count'])

In [15]:
# Топ-30 часто встречаемых слов книги
words_df.sort_values('count', ascending=False)[:30]

Unnamed: 0,word,count
16540,и,21758
6056,в,11280
24048,не,8996
51374,что,7999
27454,он,7593
22702,на,6860
41249,с,6008
17925,как,4210
13525,его,4026
17777,к,3516


Теперь решим ту же задачу с использованием pyspark

In [16]:
# textFile ожидает файл в utf8, чтобы не иметь проблем с кодировкой сразу перекодируем файл
!iconv -f cp1251 -t utf8 -o text_utf8.txt War\ and\ Peace\ by\ Leo\ Tolstoy\ \(ru\).txt
file_name = 'text_utf8.txt'

In [17]:
# Делаем ровно то же самое, сначала считываем файл и очищаем, после маппим каждое слово аналогично
# нашей реализации, а в reduceByKey происходит ровно то же, что и в нашем редьюсере
words = sc.textFile(file_name).flatMap(lambda line: clean_and_split(line))
reduced = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

In [140]:
# Выводим топ-30 часто встречаемых слов
words_sparkdf = reduced.toDF(['word', 'count'])
words_sparkdf.sort(col('count').desc()).limit(30).toPandas()


Unnamed: 0,word,count
0,и,21758
1,в,11280
2,не,8996
3,что,7999
4,он,7593
5,на,6860
6,с,6008
7,как,4210
8,его,4026
9,к,3516


Как можно увидеть, мы получили одинаковый результат

In [20]:
id='13yfAoONwq4rS5XrTv3IrcqcFcdgfvK9V'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-train.txt')

id='1VE_9x0LQvOJpHXbXp_RMPl3Q4wRUuOok'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('mnist-digits-test.txt')

Необходимо обучить модель используя Spark MLlib (модель на ваш выбор, например Decision Tree) и получить accuracy.
Подробнее тут: https://spark.apache.org/docs/latest/ml-classification-regression.html

Данные в файлах лежат в формате libsvm, в начале считываем их и индексируем label, а также скейлим наши признаки с помощью MinMaxScaler

In [141]:
train_rdd = spark.read.format("libsvm").load('mnist-digits-train.txt')
test_rdd = spark.read.format("libsvm").load('mnist-digits-test.txt')

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
train_rdd = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(train_rdd).transform(train_rdd)
test_rdd = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(test_rdd).transform(test_rdd)


Как можно заметить ниже, выборка сбалансирована

In [142]:
(train_rdd
  .groupby('label')
  .count()
  .orderBy('label')
  .show())

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 5923|
|  1.0| 6742|
|  2.0| 5958|
|  3.0| 6131|
|  4.0| 5842|
|  5.0| 5421|
|  6.0| 5918|
|  7.0| 6265|
|  8.0| 5851|
|  9.0| 5949|
+-----+-----+



В качестве модели выберем дерево решений, получаем следующий результат на тренировочных данных

In [143]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", maxDepth=10)

pipeline = Pipeline(stages=[labelIndexer, dt])
model = pipeline.fit(train_rdd)

predictions = model.transform(train_rdd)

predictions.select("prediction", "indexedLabel", "features", "scaledFeatures").show(5)

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy(on train) = {}".format(accuracy))

+----------+------------+--------------------+--------------------+
|prediction|indexedLabel|            features|      scaledFeatures|
+----------+------------+--------------------+--------------------+
|       9.0|         9.0|(780,[152,153,154...|(780,[0,1,2,3,4,5...|
|       5.0|         5.0|(780,[127,128,129...|(780,[0,1,2,3,4,5...|
|       8.0|         8.0|(780,[160,161,162...|(780,[0,1,2,3,4,5...|
|       0.0|         0.0|(780,[158,159,160...|(780,[0,1,2,3,4,5...|
|       4.0|         4.0|(780,[208,209,210...|(780,[0,1,2,3,4,5...|
+----------+------------+--------------------+--------------------+
only showing top 5 rows

Accuracy(on train) = 0.8987


Accuracy по каждому классу на трейне:

In [144]:
(predictions
  .withColumn('matched', expr('cast(prediction == indexedLabel  as int)'))
  .groupby('indexedLabel')
  .agg(avg('matched'))
  .orderBy('indexedLabel')
  .show())

+------------+------------------+
|indexedLabel|      avg(matched)|
+------------+------------------+
|         0.0|0.9606941560367843|
|         1.0| 0.906464485235435|
|         2.0|0.8587506116457347|
|         3.0|0.8892245720040282|
|         4.0|0.8767860144562112|
|         5.0|0.9417524902920817|
|         6.0|0.9246367015883744|
|         7.0|0.8742095368313109|
|         8.0|0.8772680588839439|
|         9.0|0.8664453052942261|
+------------+------------------+



Accuracy на тесте:

In [145]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

test_predictions = model.transform(test_rdd)
test_accuracy = evaluator.evaluate(test_predictions)
print("Accuracy(on test) = {}".format(test_accuracy))

Accuracy(on test) = 0.8677


Accuracy по каждому классу на тесте:

In [146]:
(test_predictions
  .withColumn('matched', expr('cast(prediction == indexedLabel  as int)'))
  .groupby('indexedLabel')
  .agg(avg('matched'))
  .orderBy('indexedLabel')
  .show())

+------------+------------------+
|indexedLabel|      avg(matched)|
+------------+------------------+
|         0.0| 0.958590308370044|
|         1.0|0.8842412451361867|
|         2.0|0.8227722772277227|
|         3.0|0.8517441860465116|
|         4.0|0.8602576808721506|
|         5.0| 0.923469387755102|
|         6.0|0.8789144050104384|
|         7.0|0.8100616016427105|
|         8.0|0.8441955193482689|
|         9.0|0.8262331838565022|
+------------+------------------+



In [None]:
id='1kUIrskM0zNH8u71G9M1BkHjRQYxvgAvh'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('data.zip')
!unzip data.zip

Для следующего задания считаем и подготовим данные.

Колонки с датами приведем к соответствующему типу, чтобы их можно было использовать в запросах.

Также для запросов с помощью SQL создадим временные таблицы для каждого набора данных.

In [81]:
from pyspark.sql.types import DateType, IntegerType
from datetime import datetime

todate = udf(lambda x: datetime.strptime(x, '%d.%m.%y'), DateType())

country_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/country.csv"))
country_rdd.registerTempTable("country")

departments_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/departments.csv"))
departments_rdd.registerTempTable("departments")

employees_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/employees.csv"))
employees_rdd = employees_rdd.withColumn('HIRE_DATE', todate(col('HIRE_DATE')))
employees_rdd.registerTempTable("employees")

job_history_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/job_history.csv"))
job_history_rdd = job_history_rdd.withColumn('START_DATE', todate(col('START_DATE')))
job_history_rdd = job_history_rdd.withColumn('END_DATE', todate(col('END_DATE')))
job_history_rdd.registerTempTable("job_history")

jobs_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/jobs.csv"))
jobs_rdd.registerTempTable("jobs")

locations_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/locations.csv"))
locations_rdd.registerTempTable("locations")

regions_rdd = (spark.read
               .options(header=True, delimiter='\t', inferSchema=True)
               .csv("data/regions.csv"))
regions_rdd.registerTempTable("regions")

Таблицы выглядят следующим образом:

In [72]:
print('Country')
country_rdd.show(2)
print('Departments')
departments_rdd.show(2)
print('Employees')
employees_rdd.show(2)
print('Job History')
job_history_rdd.show(2)
print('jobs')
jobs_rdd.show(2)
print('Locations')
locations_rdd.show(2)
print('Regions')
regions_rdd.show(2)

Country
+----------+------------+---------+
|COUNTRY_ID|COUNTRY_NAME|REGION_ID|
+----------+------------+---------+
|        AR|   Argentina|        2|
|        AU|   Australia|        3|
+----------+------------+---------+
only showing top 2 rows

Departments
+-------------+---------------+----------+-----------+
|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+---------------+----------+-----------+
|           10| Administration|       200|       1700|
|           20|      Marketing|       201|       1800|
+-------------+---------------+----------+-----------+
only showing top 2 rows

Employees
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER| HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|    

Далее результаты для каждого задания будут находиться с помощью функций pyspark, а также с помощью sql запросов к созданным временным таблицам

Кто получает больше всего? Кто меньше всего?

Больше всего из всех сотрудников получает:

In [30]:
(employees_rdd
 .filter(col('SALARY') == employees_rdd.agg({"SALARY": "max"}).collect()[0][0])
 .show())

+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|SKING|515.123.4567| 17.06.03|AD_PRES| 24000|          null|      null|           90|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+



In [31]:
spark.sql("SELECT * FROM employees WHERE SALARY= (SELECT MAX(SALARY) from employees)").show()

+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|SKING|515.123.4567| 17.06.03|AD_PRES| 24000|          null|      null|           90|
+-----------+----------+---------+-----+------------+---------+-------+------+--------------+----------+-------------+



Меньше всего из всех сотрудников получает:

In [32]:
(employees_rdd
 .filter(col('SALARY') == employees_rdd.agg({"SALARY": "min"}).collect()[0][0])
 .show())

+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|  EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+
|        132|        TJ|    Olson|TJOLSON|650.124.8234| 10.04.07|ST_CLERK|  2100|          null|       121|           50|
+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+



In [33]:
spark.sql("SELECT * FROM employees WHERE SALARY= (SELECT MIN(SALARY) from employees)").show()

+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|  EMAIL|PHONE_NUMBER|HIRE_DATE|  JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+
|        132|        TJ|    Olson|TJOLSON|650.124.8234| 10.04.07|ST_CLERK|  2100|          null|       121|           50|
+-----------+----------+---------+-------+------------+---------+--------+------+--------------+----------+-------------+



Выведите топ 5 по зарплате.

In [34]:
(employees_rdd
 .orderBy('SALARY', ascending=False)
 .limit(5)
 .show())

+-----------+----------+---------+--------+------------------+---------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|      PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------------+---------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|      515.123.4567| 17.06.03|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|      515.123.4568| 21.09.05|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|      515.123.4569| 13.01.01|  AD_VP| 17000|          null|       100|           90|
|        145|      John|  Russell| JRUSSEL|011.44.1344.429268| 01.10.04| SA_MAN| 14000|           0,4|       100|           80|
|        146|     Karen| Partners|KPARTNER|011.44.1344.467268| 05.01.05| SA_MAN| 13500|           0,3|  

In [35]:
spark.sql("SELECT * FROM employees ORDER BY SALARY DESC LIMIT 5").show()

+-----------+----------+---------+--------+------------------+---------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|      PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------------+---------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|      515.123.4567| 17.06.03|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|      515.123.4568| 21.09.05|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|      515.123.4569| 13.01.01|  AD_VP| 17000|          null|       100|           90|
|        145|      John|  Russell| JRUSSEL|011.44.1344.429268| 01.10.04| SA_MAN| 14000|           0,4|       100|           80|
|        146|     Karen| Partners|KPARTNER|011.44.1344.467268| 05.01.05| SA_MAN| 13500|           0,3|  

Сколько всего регионов? Сколько работников в каждом регионе?

Количество регионов:

In [36]:
(regions_rdd
 .agg({"REGION_ID": "count"})
 .show()
)

+----------------+
|count(REGION_ID)|
+----------------+
|               4|
+----------------+



In [37]:
spark.sql("SELECT COUNT(*) FROM regions").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



Сколько работников в каждом регионе

In [38]:
print('Всего сотрудников: {}'.format(employees_rdd.count()))

Всего сотрудников: 106


In [39]:
(regions_rdd
 .join(country_rdd, on='REGION_ID', how='inner')
 .join(locations_rdd, on='COUNTRY_ID', how='inner')
 .join(departments_rdd, on='LOCATION_ID', how='inner')
 .join(employees_rdd, on='DEPARTMENT_ID', how='inner')
 .groupby('REGION_ID')
 .count()
 .show()
 )

+---------+-----+
|REGION_ID|count|
+---------+-----+
|        1|   36|
|        2|   70|
+---------+-----+



In [40]:
spark.sql("""SELECT r.REGION_ID, COUNT(e.EMPLOYEE_ID) FROM regions as r
INNER JOIN country as c ON r.REGION_ID == c.REGION_ID
INNER JOIN locations as l ON c.COUNTRY_ID == l.COUNTRY_ID
INNER JOIN departments as d ON d.LOCATION_ID == l.LOCATION_ID
INNER JOIN employees as e ON e.DEPARTMENT_ID == d.DEPARTMENT_ID
GROUP BY r.REGION_ID""").show()

+---------+------------------+
|REGION_ID|count(EMPLOYEE_ID)|
+---------+------------------+
|        1|                36|
|        2|                70|
+---------+------------------+



Выведите всех работников из Китая.

In [41]:
(employees_rdd
 .alias('employees')
 .join(departments_rdd, on='DEPARTMENT_ID', how='inner')
 .join(locations_rdd, on='LOCATION_ID', how='inner')
 .where(col('COUNTRY_ID') == 'CN')
 .select(['employees.*', 'COUNTRY_ID'])
 .show()
)

+-------------+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|COUNTRY_ID|
+-------------+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+----------+
+-------------+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+----------+



In [42]:
spark.sql("""SELECT e.*, COUNTRY_ID from employees as e
INNER JOIN departments as d ON d.DEPARTMENT_ID == e.DEPARTMENT_ID
INNER JOIN locations as l ON l.LOCATION_ID == d.LOCATION_ID
WHERE l.COUNTRY_ID == 'CN'""").show()

+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|EMAIL|PHONE_NUMBER|HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|COUNTRY_ID|
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+----------+
+-----------+----------+---------+-----+------------+---------+------+------+--------------+----------+-------------+----------+



Как можно заметить, сотрудников из Китая в таблицах нет. Можно убедиться в этом с помощью следующего запроса

In [43]:
spark.sql("""SELECT DISTINCT c.COUNTRY_ID FROM country as c
INNER JOIN locations as l ON c.COUNTRY_ID == l.COUNTRY_ID
INNER JOIN departments as d ON d.LOCATION_ID == l.LOCATION_ID
INNER JOIN employees as e ON e.DEPARTMENT_ID == d.DEPARTMENT_ID
""").show()

+----------+
|COUNTRY_ID|
+----------+
|        CA|
|        DE|
|        US|
|        UK|
+----------+



Укажите самую высокооплачиваемую должность.

In [44]:
(jobs_rdd
 .filter(col('MAX_SALARY') == jobs_rdd.agg({"MAX_SALARY": "max"}).collect()[0][0])
 .show())

+-------+---------+----------+----------+
| JOB_ID|JOB_TITLE|MIN_SALARY|MAX_SALARY|
+-------+---------+----------+----------+
|AD_PRES|President|     20080|     40000|
+-------+---------+----------+----------+



In [45]:
spark.sql("SELECT * FROM jobs WHERE MAX_SALARY=(SELECT MAX(MAX_SALARY) from jobs)").show()

+-------+---------+----------+----------+
| JOB_ID|JOB_TITLE|MIN_SALARY|MAX_SALARY|
+-------+---------+----------+----------+
|AD_PRES|President|     20080|     40000|
+-------+---------+----------+----------+



Выведите всех работников связанных с ИТ. Выведите их менеджеров. 

In [46]:
(employees_rdd
 .alias('e1')
 .join(employees_rdd.alias('e2'), on=col('e2.EMPLOYEE_ID') == col('e1.MANAGER_ID'), how='inner')
 .join(jobs_rdd.alias('j'), on=col('j.JOB_ID') == col('e1.JOB_ID'))
 .where(col('e1.JOB_ID') == 'IT_PROG')
 .select(['e1.*', 'e2.FIRST_NAME', 'e2.LAST_NAME'])
 .show()
 )

+-----------+----------+---------+--------+------------+---------+-------+------+--------------+----------+-------------+----------+---------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|FIRST_NAME|LAST_NAME|
+-----------+----------+---------+--------+------------+---------+-------+------+--------------+----------+-------------+----------+---------+
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567| 03.01.06|IT_PROG|  9000|          null|       102|           60|       Lex|  De Haan|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568| 21.05.07|IT_PROG|  6000|          null|       103|           60| Alexander|   Hunold|
|        105|     David|   Austin| DAUSTIN|590.423.4569| 25.06.05|IT_PROG|  4800|          null|       103|           60| Alexander|   Hunold|
|        106|     Valli|Pataballa|VPATABAL|590.423.4560| 05.02.06|IT_PROG|  4800|          null|       103|           60| Alexander|   Hunold|

In [47]:
spark.sql("""SELECT e1.*, e2.FIRST_NAME as manager_FN, e2.LAST_NAME as manager_LN
FROM employees as e1
INNER JOIN employees as e2 ON e2.EMPLOYEE_ID == e1.MANAGER_ID
INNER JOIN jobs as j ON j.JOB_ID == e1.JOB_ID
WHERE e1.JOB_ID = 'IT_PROG'
""").show()

+-----------+----------+---------+--------+------------+---------+-------+------+--------------+----------+-------------+----------+----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|manager_FN|manager_LN|
+-----------+----------+---------+--------+------------+---------+-------+------+--------------+----------+-------------+----------+----------+
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567| 03.01.06|IT_PROG|  9000|          null|       102|           60|       Lex|   De Haan|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568| 21.05.07|IT_PROG|  6000|          null|       103|           60| Alexander|    Hunold|
|        105|     David|   Austin| DAUSTIN|590.423.4569| 25.06.05|IT_PROG|  4800|          null|       103|           60| Alexander|    Hunold|
|        106|     Valli|Pataballa|VPATABAL|590.423.4560| 05.02.06|IT_PROG|  4800|          null|       103|           60| Alexander|    

Выведите имя и фамилию работника, его текущую и предыдущую должности и сколько полных недель и дней прошло с момент изменения. 



Количество полных дней и недель будем считать между текущей датой и датой окончания последней работы (END_DATE)

In [121]:
(job_history_rdd
 .alias('j')
 .withColumn("row_number", row_number().over(Window.partitionBy("EMPLOYEE_ID").orderBy(col("END_DATE").desc())))
 .where(col('row_number') == 1)
 .drop(col('row_number'))
 .join(employees_rdd.alias('e'), on='EMPLOYEE_ID', how='inner')
 .select(
     col('FIRST_NAME'),
     col('LAST_NAME'),
     col('e.JOB_ID').alias('cur_job'),
     col('j.JOB_ID').alias('prev_job'),
     datediff(current_date(), col('END_DATE')).alias('days'),
     ceil(datediff(current_date(), col('END_DATE')) / 7).alias('weeks') 
 )
 .show()
)

+----------+---------+-------+----------+----+-----+
|FIRST_NAME|LAST_NAME|cur_job|  prev_job|days|weeks|
+----------+---------+-------+----------+----+-----+
|     Neena|  Kochhar|  AD_VP|    AC_MGR|5758|  823|
|       Lex|  De Haan|  AD_VP|   IT_PROG|5262|  752|
|       Den| Raphaely| PU_MAN|  ST_CLERK|4737|  677|
|     Payam| Kaufling| ST_MAN|  ST_CLERK|4737|  677|
|  Jonathon|   Taylor| SA_REP|    SA_MAN|4737|  677|
|  Jennifer|   Whalen|AD_ASST|AC_ACCOUNT|5102|  729|
|   Michael|Hartstein| MK_MAN|    MK_REP|4749|  679|
+----------+---------+-------+----------+----+-----+



In [125]:
spark.sql("""SELECT e.FIRST_NAME, e.LAST_NAME, e.JOB_ID as cur_job, j.JOB_ID as prev_job, 
DATEDIFF(current_date(), j.END_DATE) as days, CEIL(DATEDIFF(current_date(), j.END_DATE) / 7) as weeks
FROM (
  SELECT *, ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY END_DATE DESC) as row_num
  FROM job_history
) as j
INNER JOIN employees as e ON e.EMPLOYEE_ID = j.EMPLOYEE_ID
WHERE row_num = 1
""").show()

+----------+---------+-------+----------+----+-----+
|FIRST_NAME|LAST_NAME|cur_job|  prev_job|days|weeks|
+----------+---------+-------+----------+----+-----+
|     Neena|  Kochhar|  AD_VP|    AC_MGR|5758|  823|
|       Lex|  De Haan|  AD_VP|   IT_PROG|5262|  752|
|       Den| Raphaely| PU_MAN|  ST_CLERK|4737|  677|
|     Payam| Kaufling| ST_MAN|  ST_CLERK|4737|  677|
|  Jonathon|   Taylor| SA_REP|    SA_MAN|4737|  677|
|  Jennifer|   Whalen|AD_ASST|AC_ACCOUNT|5102|  729|
|   Michael|Hartstein| MK_MAN|    MK_REP|4749|  679|
+----------+---------+-------+----------+----+-----+



Выведите уникальные телефонные номера

In [128]:
(employees_rdd
 .select('PHONE_NUMBER')
 .distinct()
 .show()
 )

+------------------+
|      PHONE_NUMBER|
+------------------+
|011.44.1344.429018|
|      515.127.4566|
|      515.127.4564|
|011.44.1344.429278|
|      515.123.4569|
|      650.124.1434|
|      650.123.2234|
|011.44.1344.498718|
|      650.127.1634|
|      515.127.4561|
|011.44.1345.629268|
|      515.127.4562|
|011.44.1644.429264|
|011.44.1644.429262|
|      650.501.1876|
|      650.127.1834|
|011.44.1343.529268|
|011.44.1644.429265|
|      515.123.8181|
|      650.507.9833|
+------------------+
only showing top 20 rows



In [130]:
spark.sql("""SELECT DISTINCT PHONE_NUMBER FROM employees""").show()

+------------------+
|      PHONE_NUMBER|
+------------------+
|011.44.1344.429018|
|      515.127.4566|
|      515.127.4564|
|011.44.1344.429278|
|      515.123.4569|
|      650.124.1434|
|      650.123.2234|
|011.44.1344.498718|
|      650.127.1634|
|      515.127.4561|
|011.44.1345.629268|
|      515.127.4562|
|011.44.1644.429264|
|011.44.1644.429262|
|      650.501.1876|
|      650.127.1834|
|011.44.1343.529268|
|011.44.1644.429265|
|      515.123.8181|
|      650.507.9833|
+------------------+
only showing top 20 rows



Есть ли сотрудники с одинаковыми фамилиями и сколько их.

In [139]:
(employees_rdd
 .groupby('LAST_NAME')
 .count()
 .where(col('count') > 1)
 .show()
 )

+---------+-----+
|LAST_NAME|count|
+---------+-----+
|    Smith|    2|
|     King|    2|
|Cambrault|    2|
|   Taylor|    2|
+---------+-----+



In [133]:
spark.sql("""SELECT LAST_NAME, COUNT(*) as dup_count FROM employees
GROUP BY LAST_NAME
HAVING dup_count > 1
""").show()

+---------+---------+
|LAST_NAME|dup_count|
+---------+---------+
|    Smith|        2|
|     King|        2|
|Cambrault|        2|
|   Taylor|        2|
+---------+---------+

