<a href="https://colab.research.google.com/github/iagunov/spark_data_analysis/blob/main/iagunov_spark_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [None]:
!tar xvzf spark-3.5.0-bin-hadoop3.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [None]:
!pip install pyspark

In [None]:
!pip install findspark

In [None]:
!echo $SPARK_HOME

In [8]:
import findspark
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import  SparkContext
from pyspark.sql import functions as f
appName = "PySpark Task 1"
#master = "spark://10.3.100.4:7077"
master = 'local[*]'
# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .enableHiveSupport() \
    .appName(appName) \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

In [None]:
df_emp = spark.read.json("/content/sample_data/anscombe.json")
df_emp.show(5,False)

**TASK 1**

Напишите запрос, возвращающий строку следующего вида для каждого сотрудника: "<фамилия> зарабатывает <зарплата> каждый месяц, но хочет получать <зарплата * 3>".
Назовите колонку 'Dream Salaries'. Результат сохранить в формате parquet со сжатием GZIP

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)

# Вывод схемы DataFrame
df_emp.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)

In [None]:
# Создание временного представления для DataFrame
df_emp.createOrReplaceTempView("employees")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
        CONCAT(
            CAST(employees.last_name AS VARCHAR(50)),
            ' зарабатывает ',
            CAST(employees.salary AS VARCHAR(50)),
            ' каждый месяц, но хочет получать ',
            CAST(employees.salary * 3 AS VARCHAR(50))
        ) AS `Dream Salaries`
    FROM employees
""")

# Вывод первых 3 строк DataFrame с учетом длинных строк
df.show(3, truncate=False)

In [None]:
# Запись DataFrame в формате Parquet с указанием сжатия GZIP и режима "overwrite"
df.write.format("parquet") \
        .option("compression", "gzip") \
        .mode("overwrite") \
        .save("data_out/task_1")

# Чтение данных из Parquet-файла с указанием сжатия
spark.read.format("parquet") \
          .option("compression", "gzip") \
          .load("data_out/task_1") \
          .show(truncate=False)

**TASK 2**

Напишите запрос, возвращающий адреса всех департаментов. Запрос должен возвращать ID локации, адрес (улицу), город, штат и страну.

In [None]:
# Чтение данных из файлов в DataFrame
df_dep = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ",")
    .load("/content/departments")
)
df_loc = (
    spark.read.format("parquet")
    .option("compression", "gzip")
    .load("/content/part-00000-f3626ac3-ae1a-49d5-8aa5-760f2e41b579-c000.gz.parquet")
)
df_cont = (
    spark.read.format("orc")
    .option("codec", "snappy")
    .load("/content/part-00000-ebe0451d-1509-4b7e-a7fd-abeb9e5488ef-c000.snappy.orc")
)

# Вывод схемы DataFrame
df_dep.printSchema()
df_loc.printSchema()
df_cont.printSchema()

# Вывод первых 5 строк DataFrame
df_dep.show(5, truncate=False)
df_loc.show(5, truncate=False)
df_cont.show(5, truncate=False)

In [None]:
# Создание временного представления для DataFrame
df_dep.createOrReplaceTempView("departments")
df_loc.createOrReplaceTempView("locations")
df_cont.createOrReplaceTempView("countries")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
        location_id,
        street_address,
        city,
        state_province,
        country_name
    FROM departments
    NATURAL JOIN locations
    NATURAL JOIN countries;
""")

# Вывод первых 3 строк DataFrame с учетом длинных строк
df.show(truncate=False)

In [None]:
# Сохранение результата
df.write.format("orc") \
        .option("codec", "snappy") \
        .mode("overwrite") \
        .save("data_out/task_2")

# Проверка результата
spark.read.format("orc") \
          .option("codec", "snappy") \
          .load("data_out/task_2") \
          .show(truncate=False)

**TASK 3**

Напишите запрос, возвращающий фамилию, ID отдела и наименование отдела для каждого сотрудника; Результат сохранить в формате avro со сжатием GZIP

In [None]:
# Чтение данных из CSV-файлов в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)
df_dep = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ",")
    .load("/content/departments")
)

# Вывод схемы DataFrame
df_emp.printSchema()
df_dep.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)
df_dep.show(5)

In [None]:
# Создание временного представления для DataFrame
df_emp.createOrReplaceTempView("employees")
df_dep.createOrReplaceTempView("departments")


# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
	    emp.last_name,
	    emp.department_id,
	    dp.department_name
    FROM
	    employees emp
    INNER JOIN
	    departments dp ON emp.department_id = dp.department_id;
""")

# Вывод первых 3 строк DataFrame с учетом длинных строк
df.show(truncate=False)

In [None]:
# Сохранение результата в формате avro со сжатием gzip
df.write.format("avro") \
        .option("codec", "gzip") \
        .mode("overwrite") \
        .save("data_out/task_3.avro")

# Проверка результата
spark.read.format("avro") \
          .option("codec", "gzip") \
          .load("data_out/task_3.avro") \
          .show(truncate=False)

**TASK 4**

Напишите запрос, возвращающий фамилию, ID сотрудника, фамилию менеджера и ID менеджера для каждого сотрудника (для сотрудников, у которых нет менеджера, в этих колонках должен быть NULL).
Назовите колонки 'Employee', 'Emp#', 'Manager', 'Mgr#'. Результат сохранить в формате avro со сжатием Snappy

In [None]:
# Чтение данных из CSV-файлов в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)

# Вывод схемы DataFrame
df_emp.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)

In [None]:
# Создание временного представления для DataFrame
df_emp.createOrReplaceTempView("employees")


# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
        e.last_name AS Employee,
        e.employee_id AS `Emp`,
        m.last_name AS Manager,
        m.employee_id AS `Mgr`
    FROM
        employees e
    LEFT JOIN
        employees m ON e.manager_id = m.employee_id;
""")

# Вывод первых 3 строк DataFrame с учетом длинных строк
df.show(truncate=False)

In [None]:
# Использование символа `#` в названиях столбцов не поддерживается snappy, использования обратных ковычек и обратного слеша не помогает.
# Можно либо переименовать столбцы либо использовать другой кодек который это поддерживает, например deflate, я выбрал переименовать столбцы.

# Сохранение результата
df.write.format("avro") \
        .option("codec", "snappy") \
        .mode("overwrite") \
        .save("data_out/task_4.avro")

# Проверка результата
spark.read.format("avro") \
          .option("codec", "snappy") \
          .load("data_out/task_4.avro") \
          .show(truncate=False)

**TASK 5**

Напишите запрос, возвращающий фамилии и зарплаты всех сотрудников, которые подчиняются сотруднику King.
Используйте подзапрос.

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)

# Вывод схемы DataFrame
df_emp.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)

In [None]:
# Создание временного представления "employees" для DataFrame
df_emp.createOrReplaceTempView("employees")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
	    last_name,
	    salary
    FROM employees
    WHERE manager_id IN (SELECT employee_id FROM employees WHERE last_name = 'King')
""")

# Вывод строк DataFrame без обрезки значений
df.show(truncate=False)

In [None]:
# Запись DataFrame в формате Parquet с указанием сжатия GZIP и режима "overwrite"
df.write.format("parquet") \
        .option("compression", "gzip") \
        .mode("overwrite") \
        .save("data_out/task_5")

# Чтение данных из Parquet-файла с указанием сжатия
spark.read.format("parquet") \
          .option("compression", "gzip") \
          .load("data_out/task_5") \
          .show(truncate=False)

**TASK 6**

Напишите запрос, возвращающий фамилии всех сотрудников, получающих больше, чем любой сотрудник отдела с ID 60.

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)

# Вывод схемы DataFrame
df_emp.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)

In [None]:
# Создание временного представления "employees" для DataFrame
df_emp.createOrReplaceTempView("employees")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
	    last_name
    FROM employees
    WHERE salary > (SELECT MAX(salary) FROM employees WHERE department_id = 60)
""")

# Вывод строк DataFrame без обрезки значений
df.show(truncate=False)

In [None]:
# Запись DataFrame в формате Parquet с указанием сжатия GZIP и режима "overwrite"
df.write.format("parquet") \
        .option("compression", "gzip") \
        .mode("overwrite") \
        .save("data_out/task_6")

# Чтение данных из Parquet-файла с указанием сжатия
spark.read.format("parquet") \
          .option("compression", "gzip") \
          .load("data_out/task_6") \
          .show(truncate=False)

**TASK 7**

Напишите запрос, возвращающий ID, фамилии и зарплаты всех сотрудников, работающих в одном отделе с работником, в чьей фамилии есть буква 'u' и получающих больше средней зарплаты в компании.

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)

# Вывод схемы DataFrame
df_emp.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)

In [None]:
# Создание временного представления "employees" для DataFrame
df_emp.createOrReplaceTempView("employees")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
	    employee_id,
	    last_name,
	    salary
    FROM employees
    WHERE department_id = (SELECT department_id FROM employees WHERE upper(last_name) LIKE '%U%' LIMIT 1) AND
	        salary > (SELECT AVG(salary) FROM employees)
""")

# Вывод строк DataFrame без обрезки значений
df.show(truncate=False)

In [None]:
# Сохранение результата
df.write.format("orc") \
        .option("codec", "snappy") \
        .mode("overwrite") \
        .save("data_out/task_7")

# Проверка результата
spark.read.format("orc") \
          .option("codec", "snappy") \
          .load("data_out/task_7") \
          .show(truncate=False)

**TASK 8**

Выведите фамилии, id отдела и название отдела для всех сотрудников, не привязанных ни к одному отделу, а также список отделов, к которым не привязан ни один сотрудник.

In [None]:
# Чтение данных из CSV-файла в DataFrame
df_emp = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load("/content/employees")
)
df_dep = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ",")
    .load("/content/departments")
)

# Вывод схемы DataFrame
df_emp.printSchema()
df_dep.printSchema()

# Вывод первых 5 строк DataFrame
df_emp.show(5)
df_dep.show(5)

In [None]:
# Создание временного представления для DataFrames
df_emp.createOrReplaceTempView("employees")
df_dep.createOrReplaceTempView("departments")

# SQL-запрос для формирования нового DataFrame с использованием Spark SQL
df = spark.sql("""
    SELECT
	    emp.employee_id,
	    emp.last_name
    FROM employees emp
    WHERE emp.department_id IS NULL

    UNION ALL

    SELECT
	    dep.department_id,
	    dep.department_name
    FROM employees emp
    RIGHT JOIN departments dep USING(department_id)
    WHERE emp.department_id IS NULL;
""")

# Вывод строк DataFrame без обрезки значений
df.show(truncate=False)

In [None]:
# Сохранение результата
df.write.format("orc") \
        .option("codec", "snappy") \
        .mode("overwrite") \
        .save("data_out/task_8")

# Проверка результата
spark.read.format("orc") \
          .option("codec", "snappy") \
          .load("data_out/task_8") \
          .show(truncate=False)