# Тестовые задачи по взаимодействию с SparkAPI

Установка спарка на машину

In [1]:
pip install pyspark



Обратимся к гиту, где лежат некоторые файлы для выполнения задач.

In [2]:
!git clone https://github.com/databricks/LearningSparkV2.git

Cloning into 'LearningSparkV2'...
remote: Enumerating objects: 1720, done.[K
remote: Counting objects: 100% (1720/1720), done.[K
remote: Compressing objects: 100% (1036/1036), done.[K
remote: Total 1720 (delta 546), reused 1691 (delta 541), pack-reused 0 (from 0)[K
Receiving objects: 100% (1720/1720), 76.97 MiB | 11.67 MiB/s, done.
Resolving deltas: 100% (546/546), done.
Updating files: 100% (768/768), done.


# Задачи

## 1. Возможности спарка

In [None]:
#Собираем библиотеки
import requests
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType, FloatType, IntegerType
from google.colab import files
import pandas as pd
from google.colab import files

#ссылка на загрузку CSV файла
url = f'https://drive.google.com/uc?id=1ZuLZDoPboHnM1P8m3PflIe70zUGvjsRq'
#Загружаем файл
response = requests.get(url)
#Проверяем статус ответа, если ок, то открываем файл car в памяти
if response.status_code == 200:
    with open('car.csv', 'wb') as f:
        f.write(response.content)
    print("Файл загружен")
else:
    print(f"Файл не загрузился {response.status_code}")

Файл загружен


In [None]:
#Делаем функцию, которая принимает загружаемый файл и открывает его как объект спарка

#вытаскиваем имя выгружаемого файла
file_name = str(f).split("name='")[1].split("'")[0]

def create_spark_object(file_name):
    if str(f).lower().find(file_name) > 1:
        spark = SparkSession.builder.appName('Test').getOrCreate()
        dt = spark.read.format("csv").option("header", "true").load(file_name)
        return dt
df = create_spark_object(file_name)

df.show(1, vertical = True)

-RECORD 0-------------------------
 manufacturer_name | Subaru       
 model_name        | Outback      
 transmission      | automatic    
 color             | silver       
 odometer_value    | 190000       
 year_produced     | 2010         
 engine_fuel       | gasoline     
 engine_has_gas    | False        
 engine_type       | gasoline     
 engine_capacity   | 2.5          
 body_type         | universal    
 has_warranty      | False        
 state             | owned        
 drivetrain        | all          
 price_usd         | 10900.0      
 is_exchangeable   | False        
 location_region   | Минская обл. 
 number_of_photos  | 9            
 up_counter        | 13           
 feature_0         | False        
 feature_1         | True         
 feature_2         | True         
 feature_3         | True         
 feature_4         | False        
 feature_5         | True         
 feature_6         | False        
 feature_7         | True         
 feature_8         |

In [None]:
#Select
df.select("manufacturer_name","model_name").show(1)
df.select(df["manufacturer_name"],df["model_name"]).show(1)
df.select(F.col("manufacturer_name"), F.col("model_name")).show(3)

+-----------------+----------+
|manufacturer_name|model_name|
+-----------------+----------+
|           Subaru|   Outback|
+-----------------+----------+
only showing top 1 row

+-----------------+----------+
|manufacturer_name|model_name|
+-----------------+----------+
|           Subaru|   Outback|
+-----------------+----------+
only showing top 1 row

+-----------------+----------+
|manufacturer_name|model_name|
+-----------------+----------+
|           Subaru|   Outback|
|           Subaru|   Outback|
|           Subaru|  Forester|
+-----------------+----------+
only showing top 3 rows



In [None]:
#filter
NAME = 'Audi'

df\
   .select("manufacturer_name","model_name","transmission", "color")\
   .filter(F.col("manufacturer_name") == NAME)\
   .filter("transmission = 'automatic' and color = 'black'").show(2)

+-----------------+----------+------------+-----+
|manufacturer_name|model_name|transmission|color|
+-----------------+----------+------------+-----+
|             Audi|        TT|   automatic|black|
|             Audi|        A6|   automatic|black|
+-----------------+----------+------------+-----+
only showing top 2 rows



In [None]:
#distinct_count
df.select("manufacturer_name").distinct().count()

55

In [None]:
#count
df.count()

38531

In [None]:
#GROUPBY() and ORDERBY()
df.groupBy("manufacturer_name").count().orderBy(F.col("count").desc()).show(5)

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|             Opel| 2759|
|              BMW| 2610|
|             Ford| 2566|
|          Renault| 2493|
+-----------------+-----+
only showing top 5 rows



In [None]:
# Переименовываем колонку
df = df.withColumnRenamed("manufacturer_name", "manufacturer")

In [None]:
#Создаем новую колонку
df = df.withColumn("next_year", F.col("year_produced") - 1)

In [None]:
#Посмотрим типы колонок
df.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)
 |-- 

In [None]:
#посмотрим метрики по чиловым столбцам
df.select('odometer_value', 'year_produced', 'engine_capacity', 'price_usd', 'number_of_photos', 'up_counter', 'duration_listed').describe().show()

+-------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+
|summary|    odometer_value|     year_produced|  engine_capacity|        price_usd| number_of_photos|        up_counter|   duration_listed|
+-------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+
|  count|             38531|             38531|            38521|            38531|            38531|             38531|             38531|
|   mean| 248864.6384469648|2002.9437336170874|2.055161106928777|6639.971021255605|9.649061794399314|16.306091199294073|  80.5772494874257|
| stddev|136072.37652978086| 8.065730511309935|0.671177667208744|6428.152018202911|6.093216996872852| 43.28693309422311|112.82656864261321|
|    min|                 0|              1942|              0.2|              1.0|                1|                 1|                 0|
|    max|           

In [None]:
#изменим тыпы колонок
df.withColumn("odometer_value", df["odometer_value"].cast(IntegerType()))\
  .withColumn("year_produced", df["year_produced"].cast(IntegerType()))\
  .withColumn("engine_capacity", df["engine_capacity"].cast(FloatType()))\
  .withColumn("price_usd", df["price_usd"].cast(FloatType()))\
  .withColumn("number_of_photos", df["number_of_photos"].cast(IntegerType()))\
  .withColumn("up_counter", df["up_counter"].cast(IntegerType()))\
  .withColumn("duration_listed", df["duration_listed"].cast(IntegerType()))\
  .withColumn("next_year", df["next_year"].cast(IntegerType()))\
  .printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: integer (nullable = true)
 |-- year_produced: integer (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: float (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: float (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: integer (nullable = true)
 |-- up_counter: integer (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)
 |-

## 2. Перейдем к написанию небольшого пайплана

Задача: Необходимо сделать пайплайн обработки файла cars.csv . Посчитать по каждому производителю (поле manufacturer_name):

*   Количество объявлений
*   Средний год выпуска автомобилей
*   Минимальную цену
*   Максимальную цену

Выгрузить результат в output.csv


In [None]:
#Напишем фнкцию, которая производит обработку и выгружает данные
def create_output():
    output = (
        df
        .groupBy('manufacturer')
        .agg(
            F.count('manufacturer').alias('Count'),
            F.round(F.avg('year_produced')).cast(IntegerType()).alias('Avarage'),
            F.min(F.col('price_usd').cast(FloatType())).alias('Min_price'),
            F.max(F.col('price_usd').cast(FloatType())).alias('Max_price')
            )
        )
    output.show(5)
    dt = pd.DataFrame(output.take(5), columns=output.columns)
    dt.to_csv('filename.csv', index=False)
    files.download('filename.csv')
    return dt

create_output()

+------------+-----+-------+---------+---------+
|manufacturer|Count|Avarage|Min_price|Max_price|
+------------+-----+-------+---------+---------+
|  Volkswagen| 4243|   2002|      1.0|  43999.0|
|       Lexus|  213|   2008|   2500.0| 48610.45|
|      Jaguar|   53|   2009|   2500.0|  50000.0|
|       Rover|  235|   1998|    200.0|   9900.0|
|      Lancia|   92|   2000|    200.0|   9500.0|
+------------+-----+-------+---------+---------+
only showing top 5 rows



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Unnamed: 0,manufacturer,Count,Avarage,Min_price,Max_price
0,Volkswagen,4243,2002,1.0,43999.0
1,Lexus,213,2008,2500.0,48610.449219
2,Jaguar,53,2009,2500.0,50000.0
3,Rover,235,1998,200.0,9900.0
4,Lancia,92,2000,200.0,9500.0


In [None]:
#Остонавливаем сессию спарк
spark.stop()

### 1. Поиск M&Ms

Дан сsv файл, который подтягивается из гугл диска.
Задача посчитать количество M&Ms по цветовой гамме в пачках в разных штатах.

In [None]:
#Библиотеки
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
import requests

#ссылка на загрузку CSV файла c гугл Диска
url = f'https://drive.google.com/uc?id=1oZTMFL05rLTN-oqBXl4W-rWS-9bpcphg'

#Загружаем файл
response = requests.get(url)
#Проверяем статус ответа, если ок, то открываем файл в памяти
if response.status_code == 200:
    with open('mnm.csv', 'wb') as f:
        f.write(response.content)
    print("Файл загружен")
else:
    print(f"Файл не загрузился {response.status_code}")

In [None]:
#выводим название файла
file_name = str(f).split("name='")[1].split("'")[0]
#вызываем функцию, которая генерит объект объект Spark
df = create_spark_object(file_name)

#посмотрим кол-во конфет в разрезе штатов и цветовой гаммы
count_mnm_state_color = (df.select('State', 'Color', 'Count')
                           .groupBy('State', 'Color')
                           .agg(count("Count").alias("Total"))
                           .orderBy("Total", ascending=False).show())

#проверим конфеты по Калифорнии
count_mnm_NY = (df.select('State','Color','Count')
                  .where(df.State == 'CA')
                  .groupBy('State', 'Color')
                  .agg(count("Count").alias('Total'))
                  .orderBy("Total", ascending=False)
                  .show())
count_mnm_NY

## 3. Поиск слова Spark в тексте.

Дан .md файл с текстом. Задача, выгрузить строки, где есть слово Spark из файла md формата.

In [None]:
#Выгрузка библиотек
from pyspark.sql import SparkSession
from google.colab import drive
from google.colab import files

#обращаемся к гугл диску
drive.mount('/content/drive', force_remount=True)

#создаем спарк приложение и читаем файл
spark = SparkSession.builder.config('spark.ui.port', '4050').appName('Test').getOrCreate()
file_path = '/content/drive/My Drive/README.md'
strings = spark.read.text(file_path)

#выводим строки с неоьходимым нам словом
filtered = strings.filter(strings.value.contains("Spark"))
filtered.show(100)

In [None]:
#Останавливаем сессию
spark.stop()

## 4. Создаем схему




### 1. Задание
Необходимо сгенерить данные и создать под них схему.

In [None]:
# Выгружаем библиотеку
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# Прописываем схему DDL
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
# Данные для датафрейма
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
 [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
"LinkedIn"]],
 [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
"twitter", "FB", "LinkedIn"]],
 [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
["twitter", "FB"]],
 [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
"twitter", "FB", "LinkedIn"]],
 [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
["twitter", "LinkedIn"]]
 ]
spark = (SparkSession
 .builder
 .appName("Example-3_6")
 .getOrCreate())

# Создаем датафрейм используя данные и схему
blogs_df = spark.createDataFrame(data, schema)

#Выводим данные и структуру схемы
blogs_df.show()
print(blogs_df.schema)
print(' ')
blogs_df.printSchema()

In [None]:
blogs_df\
       .withColumn('Big_Hints', F.col('Hits') > 5000)\
       .withColumn('Auth_data', (F.concat(F.col('First'), F.col('Last'), F.col('Id'))))\
       .sort(F.col('Hits').desc())\
       .show()

### 2. Задание
Необходимо создать схему и положить в нее данные из GitHub. <a name="4.2">

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

#Моделим схему
file_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('InciedentNumber', IntegerType(),True),
                          StructField('CallType', StringType(), True),
                          StructField('CallDate', StringType(), True),
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True),
                          StructField('City', StringType(), True),
                          StructField('Zipcode', IntegerType(), True),
                          StructField('Battalion', StringType(), True),
 StructField('StationArea', StringType(), True),
 StructField('Box', StringType(), True),
 StructField('OriginalPriority', StringType(), True),
 StructField('Priority', StringType(), True),
 StructField('FinalPriority', IntegerType(), True),
 StructField('ALSUnit', BooleanType(), True),
 StructField('CallTypeGroup', StringType(), True),
 StructField('NumAlarms', IntegerType(), True),
 StructField('UnitType', StringType(), True),
 StructField('UnitSequenceInCallDispatch', IntegerType(), True),
 StructField('FirePreventionDistrict', StringType(), True),
 StructField('SupervisorDistrict', StringType(), True),
 StructField('Neighborhood', StringType(), True),
 StructField('Location', StringType(), True),
 StructField('RowID', StringType(), True),
 StructField('Delay', FloatType(), True)])

#Читаем датафрейм CSV из GitHub
spark = SparkSession.builder.config('spark.ui.port', '4050').appName('Test').getOrCreate()
str_file = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
fire_df = spark.read.csv(str_file, header = True, schema = file_schema)

fire_df.show(5)
fire_df.printSchema()

## 5. Работа с строками

Создать датафрейм из объекта Row

In [None]:
from pyspark.sql import Row

#содержимое строки
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
author_row = spark.createDataFrame(rows, ["Authors","Region"])
author_row.show()

## 6. Поиск не пустых значений

Нужно посмотреть причины вызовов пожарных частей - значения должны быть не Null.

Для выпонения надо выполнить [4.2](#4.2)  



In [None]:
#Читаем датафрейм CSV из GitHub
from pyspark.sql import SparkSession

fire_df.select(F.col("CallType"))\
       .filter(F.col("CallType").isNotNull())\
       .groupBy(F.col("CallType"))\
       .agg(F.count('CallType').alias('Count'))\
       .orderBy(F.col("Count").desc())\
       .show(100,False)

## 7. Замена столбцов и их удаление

Дан датафрейм из Задачи [4.2](#4.2) необходимо изменить тип временных колонок для дальнейшей работы  с ними. Таже необходимо подготовить ответы на вопросы -
* Посмотреть за какие года у были инциденты.
* Какие виды пожара были в 2018 году.
* В какие месяцы 2018 года было больше всего вызовов пожарных?
* В каком районе Сан Франциско произошло больше всего звонков о пожаре в 2018?
* В каких районах было худшее время реагирования пожарных в 2018 году?
* На какой неделе 2018 года было больше всего вызовов пожарных?
* Как использовать тип файлов Паркет, для хранения этих данных.

In [None]:
#Задача У всех временных столбцов тип поля string
fire_df.select('CallDate','WatchDate','AvailableDtTm').printSchema()

#заменим типы и удалим старые колонки
fire_df = (fire_df
           .withColumn("IncidentDate", F.to_timestamp("CallDate", "MM/dd/yyyy"))
           .drop("CallDate")
           .withColumn("OnWatchDate", F.to_timestamp("WatchDate","MM/dd/yyyy"))
           .drop("WatchDate")
           .withColumn("AvailableDtTS", F.to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a"))
           .drop("AvailableDtTm"))

#Проверка
fire_df.printSchema()

In [None]:
#Задача 1 за какие года были инциденты
fire_df.select(F.year('IncidentDate').alias('Year')).distinct().orderBy(F.year('IncidentDate')).show(100,False)

In [None]:
#Задача 2 какие виды пожаров были в 2018
fire_df.select(F.col("CallType"))\
       .filter(F.year('IncidentDate') == 2018)\
       .groupBy(F.col("CallType"))\
       .agg(F.count('CallType').alias('Count'))\
       .orderBy(F.col("Count").desc())\
       .show(100,False)

In [None]:
#Задача 3 в какие месяцы 2018 года было больше всего вызовов пожарных
a = (fire_df.select(F.month("IncidentDate").alias('MONTH'), F.year('IncidentDate').alias('year'))
            .filter(F.year('IncidentDate') == 2018)
            .groupBy('MONTH', 'year')
            .agg(F.count('*').alias('count_calls'))
            .orderBy(F.col('count_calls').desc())
            .show(100,False))
a

In [None]:
#Задача 4 в каком районе Сан Франциско произошло больше всего звонков о пожаре в 2018?
a = (fire_df
           .select('Neighborhood', 'NumAlarms')
           .where((F.col('City') == 'San Francisco') & (F.year('IncidentDate') == 2018))
           .groupBy('Neighborhood')
           .agg(F.sum('NumAlarms').alias('Count_calls'))
           .orderBy(F.col('Count_calls').desc())
           .show(1, False))
a

In [None]:
#Задание 5 в каких районах было худшее время реагирования пожарных в 2018 году?
a = (fire_df
           .select('City', 'Neighborhood', 'Delay')
           .filter(F.year('IncidentDate') == 2018)
           .orderBy(F.col('Delay').desc())
           .show(5, False))

In [None]:
#Задание 6 на какой неделе 2018 года было больше всего вызовов пожарных?
a = (fire_df
           .select('IncidentDate', 'NumAlarms')
           .filter(F.year('IncidentDate') == 2018)
           .groupBy(F.weekofyear('IncidentDate'))
           .agg(F.sum('NumAlarms').alias('sums'))
           .orderBy(F.col('sums').desc())
           .show(5, False))

In [None]:
#Задание 7 как использовать тип файлов Паркет, для хранения этих данных.
fire_df.write.format('parquet').save('/content/sample_data/qwe')

In [None]:
from pyspark.sql import Row
row = Row(350, True, "Learning Spark 2E", None)
row[2]

## 8. SparkSQL совместимость с DataFrame

Необходимо прочесть данные во временном представлении. С помощью sql.

In [3]:
#импорт библиотеки
from pyspark.sql import SparkSession

#открываем сессию
spark = SparkSession.builder \
    .appName("Hive options in PySpark") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.0.1") \
    .enableHiveSupport() \
    .getOrCreate()

#ссылка на датассет
csv_file = '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv'

#делаем датафрейм в Спарке
df = (spark.read.format("csv")
                .option("inferSchema", "true")
                .option("header", "true")
                .load(csv_file))

#создаем временную таблицу
df.createOrReplaceTempView("us_delay_flights_tbl")

#создаем схему
schema = "'date' STRING, 'delay' INT, 'distance' INT, 'origin' STRING, 'destination' STRING"
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows



* date - cтолбец даты содержит строку типа 02190925. При преобразовании это соответствует 02–19 09:25.

* delay - в столбце задержки указана задержка в минутах между запланированным и фактическим временем.
время отправления. Ранние вылеты показывают отрицательные числа.

* distance - в столбце «Расстояние» указано расстояние в милях от аэропорта отправления до пункта отправления.
аэропорт назначения.

* origin - cтолбец происхождения содержит код аэропорта отправления IATA.

* destination - cтолбец назначения содержит код аэропорта назначения IATA.

In [None]:
#Задача 1. Найдем все рейсы где расстояние превышает 1000 миль.
spark.sql("SELECT distance, origin, destination \
           FROM us_delay_flights_tbl \
           WHERE distance > 1000 \
           ORDER BY distance DESC").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



Все самые длинные перелеты были между Гонолулу (HNL) и Нью-Йорком.
Йорк (JFK).

In [None]:
#Задача 2. Найдем все рейсы между Сан-Франциско (SFO) и Чикаго. (ORD) с задержкой не менее двух часов
spark.sql("SELECT * FROM us_delay_flights_tbl \
           WHERE origin = 'SFO' AND destination = 'ORD' \
                 AND delay > 120 \
           ORDER BY delay DESC").show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|2190925| 1638|    1604|   SFO|        ORD|
|1031755|  396|    1604|   SFO|        ORD|
|1022330|  326|    1604|   SFO|        ORD|
|1051205|  320|    1604|   SFO|        ORD|
|1190925|  297|    1604|   SFO|        ORD|
+-------+-----+--------+------+-----------+
only showing top 5 rows



In [None]:
#Задача 3. Пометим все рейсы в США, независимо от пункта отправления и назначения, с указанием возникших задержек:
#очень длительные задержки (> 6 часов), Длительные задержки (2–6 часов) и т. д.
spark.sql("SELECT delay, origin, destination, \
             CASE \
                 WHEN delay > 360 THEN 'Very Long Delays' \
                 WHEN delay > 120 AND delay < 360 THEN 'Long Delays' \
                 WHEN delay > 60 AND delay < 120 THEN 'Short Delays' \
                 WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays' \
                 WHEN delay = 0 THEN 'No Delays' \
                 ELSE 'Early' \
              END AS Flight_Delays \
           FROM us_delay_flights_tbl \
           ORDER BY origin, delay DESC").show(18)
spark.stop()

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
|  180|   ABE|        ATL|  Long Delays|
|  173|   ABE|        DTW|  Long Delays|
|  165|   ABE|        ATL|  Long Delays|
|  159|   ABE|        ORD|  Long Delays|
|  159|   ABE|        ATL|  Long Delays|
|  158|   ABE|        ATL|  Long Delays|
|  151|   ABE|        DTW|  Long Delays|
|  127|   ABE|        ATL|  Long Delays|
+-----+------+-----------+-------------+
only showing top 18 rows



In [None]:
#Задача 4. Создать бд и поселить в ней таблицу

#импорт библиотеки
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

# Create spark session with hive enabled (изменен порядок)
spark = SparkSession.builder \
    .appName("Hive options in PySpark") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.0.1") \
    .enableHiveSupport() \
    .getOrCreate()

#создаем бд
spark.sql("CREATE DATABASE IF NOT EXISTS learn_spark_db")

#задействуем ее
spark.sql("USE learn_spark_db")

#создаем управляемую таблицу
spark.sql("CREATE TABLE IF NOT EXISTS managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")
spark.sql("SELECT * FROM managed_us_delay_flights_tbl").show(5)

#также можно создать неуправляемую таблицу
spark.sql("""CREATE TABLE IF NOT EXISTS us_delay_flights_tbl(date STRING, delay INT,
 distance INT, origin STRING, destination STRING)
 USING csv OPTIONS (PATH
 '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|    date| NULL|    NULL|origin|destination|
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows



### Глобальне и временные представления

In [None]:
#Задача 5. Создать глобальную и временную вьюху
df_sfo_GLOB = spark.sql("SELECT date, delay, origin, destination \
                         FROM us_delay_flights_tbl WHERE origin = 'SFO'")
df_sfo_GLOB.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
print("Глобальное представление")
spark.sql('SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view').show(5)


df_jfk = spark.sql("SELECT date, delay, origin, destination \
                    FROM us_delay_flights_tbl WHERE origin = 'JFK'")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
print("Локальное представление")
spark.sql('SELECT * FROM us_origin_airport_JFK_tmp_view').show(5)

#метаинфа по базе данных
spark.catalog.listDatabases()
spark.catalog.listTables()

Глобальное представление
+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01011250|   55|   SFO|        JFK|
|01012230|    0|   SFO|        JFK|
|01010705|   -7|   SFO|        JFK|
|01010620|   -3|   SFO|        MIA|
|01010915|   -3|   SFO|        LAX|
+--------+-----+------+-----------+
only showing top 5 rows

Локальное представление
+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01010900|   14|   JFK|        LAX|
|01011200|   -3|   JFK|        LAX|
|01011900|    2|   JFK|        LAX|
|01011700|   11|   JFK|        LAS|
|01010800|   -1|   JFK|        SFO|
+--------+-----+------+-----------+
only showing top 5 rows



[Table(name='managed_us_delay_flights_tbl', catalog='spark_catalog', namespace=['learn_spark_db'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='us_delay_flights_tbl', catalog='spark_catalog', namespace=['learn_spark_db'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='us_origin_airport_JFK_tmp_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
#Задача 5 (1) тоже самое на SQL
spark.sql("CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'SFO'")
spark.sql("SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view").show(5)

spark.sql("CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'JFK'")
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view").show(5)

spark.sql('DROP TABLE us_origin_airport_JFK_tmp_view')
spark.sql('DROP TABLE global_temp.us_origin_airport_SFO_global_tmp_view')

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01011250|   55|   SFO|        JFK|
|01012230|    0|   SFO|        JFK|
|01010705|   -7|   SFO|        JFK|
|01010620|   -3|   SFO|        MIA|
|01010915|   -3|   SFO|        LAX|
+--------+-----+------+-----------+
only showing top 5 rows

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01010900|   14|   JFK|        LAX|
|01011200|   -3|   JFK|        LAX|
|01011900|    2|   JFK|        LAX|
|01011700|   11|   JFK|        LAS|
|01010800|   -1|   JFK|        SFO|
+--------+-----+------+-----------+
only showing top 5 rows



DataFrame[]

### Метаинформация по таблицам, листам, бд

In [None]:
#Метаинфа по БД
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/content/spark-warehouse'),
 Database(name='learn_spark_db', catalog='spark_catalog', description='', locationUri='file:/content/spark-warehouse/learn_spark_db.db')]

In [None]:
#Метаинфа по таблицам
spark.catalog.listTables()

[Table(name='managed_us_delay_flights_tbl', catalog='spark_catalog', namespace=['learn_spark_db'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='us_delay_flights_tbl', catalog='spark_catalog', namespace=['learn_spark_db'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='us_origin_airport_JFK_tmp_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
#метаинфа по определенной таблице в бд
spark.catalog.listColumns("us_delay_flights_tbl")

[Column(name='date', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

### Чтение и запись данных из файлов Parquet, CSV, JSON,

In [None]:
#Задача 6 Создать неуправляемую таблицу и посмотреть Parquet файл напрямую с помощью SQL

#чтение файла
spark.sql("CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl \
           USING parquet \
           OPTIONS (path '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet')")
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)
spark.sql("DROP TABLE us_delay_flights_tbl")

#запись файла
df_jfk.write.format("parquet").mode("overwrite").option("compression", "snappy").save("/content/LearningSparkV2/tmp/parquet/")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [None]:
#Задача 7 Создать неуправляемую таблицу и посмотреть JSON файл напрямую с помощью SQL

#чтение файла
spark.sql("CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl \
           USING json \
           OPTIONS (path '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/json/*')")

#вывод файла
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)
spark.sql("DROP TABLE us_delay_flights_tbl")

#запись файла
df_jfk.write.format("JSON").mode("overwrite").option("compression", "snappy").save("/content/LearningSparkV2/tmp/JSON/")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [None]:
#Задача 8 Создать неуправляемую таблицу и посмотреть CSV файл напрямую с помощью SQL

#чтение файла
spark.sql("CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl \
           USING csv \
           OPTIONS \
           (path '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/csv', \
            header 'true', \
            inferSchema 'true', \
            mode 'FAILFAST')")

#вывод файла
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)
spark.sql("DROP TABLE us_delay_flights_tbl")

#запись файла
df_jfk.write.format("CSV").mode("overwrite").option("header", "true").save("/content/LearningSparkV2/tmp/CSV/")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [None]:
#Задача 9 Создать неуправляемую таблицу и посмотреть ORC файл напрямую с помощью SQL
spark.sql("CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl \
             USING orc \
             OPTIONS (path '/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/orc/2010-summary.orc')")

#вывод файла
spark.sql("SELECT * FROM us_delay_flights_tbl").show(5)
spark.sql("DROP TABLE us_delay_flights_tbl")

#запись файла
df_jfk.write.format("orc").mode("overwrite").save("/content/LearningSparkV2/tmp/orc/")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### Использование функций

In [None]:
#Задача 10 Создать функцию, которая будет датафрейм и выполнять обработку

#импортируем типы их нужно указать в регистраторе функции
from pyspark.sql.types import LongType

def cud(s):
  return s * s * s

#регистратор функции
spark.udf.register("cud", cud, LongType())

#создаем датафрейм с данными
spark.range(1,9).createOrReplaceTempView("udf_test")

#используем функцию
spark.sql("SELECT id, cud(id) as hz FROM udf_test").show()

spark.sql("SELECT array_join(array('hello','world'), ' пися ')").show()


+---+---+
| id| hz|
+---+---+
|  1|  1|
|  2|  8|
|  3| 27|
|  4| 64|
|  5|125|
|  6|216|
|  7|343|
|  8|512|
+---+---+

+---------------------------------------+
|array_join(array(hello, world),  пися )|
+---------------------------------------+
|                       hello пися world|
+---------------------------------------+



In [None]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) ->pd.Series:
  return a * a * a

cubed_udf = pandas_udf(cubed, returnType=LongType())
x = pd.Series([1, 2, 3])
print(cubed(x))

0     1
1     8
2    27
dtype: int64


#### Функции высшего порядка

In [None]:
#Пример для использования функций
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
t_c.show(2, False)

+----------------------------+
|celsius                     |
+----------------------------+
|[35, 36, 32, 30, 40, 42, 38]|
|[31, 32, 34, 55, 56]        |
+----------------------------+



In [None]:
#transform() создает массив и передает функцию на каждый элемент массива
spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit\
             FROM tC""").show(2 , False)

+----------------------------+-------------------------------+
|celsius                     |fahrenheit                     |
+----------------------------+-------------------------------+
|[35, 36, 32, 30, 40, 42, 38]|[95, 96, 89, 86, 104, 107, 100]|
|[31, 32, 34, 55, 56]        |[87, 89, 93, 131, 132]         |
+----------------------------+-------------------------------+



In [None]:
#filter() создает массив только из элементов соответствующим булевому значению
spark.sql("""
SELECT celsius,
 filter(celsius, t -> t > 36) as high
 FROM tC
""").show(5, False)

+----------------------------+------------+
|celsius                     |high        |
+----------------------------+------------+
|[35, 36, 32, 30, 40, 42, 38]|[40, 42, 38]|
|[31, 32, 34, 55, 56]        |[55, 56]    |
+----------------------------+------------+



In [None]:
#exists() возвращает true если логическая функция выполняется для любого элемента входного массива
spark.sql("""SELECT celsius, exists(celsius, t -> t = 38) as threshold\
           FROM tC""").show(3, False)

+----------------------------+---------+
|celsius                     |threshold|
+----------------------------+---------+
|[35, 36, 32, 30, 40, 42, 38]|true     |
|[31, 32, 34, 55, 56]        |false    |
+----------------------------+---------+



#### Задача
* Импортируйте два файла и создайте два DataFrame, один для информации об аэропорте (airportsna).
и один для задержек рейсов в США (departureDelays)

* Используя expr(), преобразуйте столбцы задержки и расстояния из STRING в INT.

* Создать небольшую таблицу содержащую только информацию о трёх рейсах, вылетающих из Сиэтла (SEA) в пункт назначения.
изменение Сан-Франциско (SFO) на небольшой временной диапазон.


In [5]:
#импорт библиотек
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import expr

# Create spark session with hive enabled (изменен порядок)
spark = SparkSession.builder \
    .appName("Hive options in PySpark") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.0.1") \
    .enableHiveSupport() \
    .getOrCreate()


#ссылки на датафреймы
airportsnaFILE = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

departureFILE = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

#считываем датафреймы
#информации об аэропорте
airportsna = (spark.read
                    .format("csv")
                    .options(header = 'True', inferSchema='True', sep = "\t")
                    .load(airportsnaFILE))
airportsna.show(5)
airportsna.createOrReplaceTempView("airports_na")

#информации о задержке рейсов
departure = (spark.read
                  .format("csv")
                  .options(header='true', inferSchema='True', sep = ",")
                  .load(departureFILE))
departure.show(5)

#поменяем формат столбцов
departure = (departure
                       .withColumn("delay", expr("CAST(delay as INT) as delay"))
                       .withColumn("distance", expr("CAST(distance as INT) as distance")))

departure.createOrReplaceTempView("departureDelays")

+----------+-----+-------+----+
|      City|State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|   BC| Canada| YXX|
|  Aberdeen|   SD|    USA| ABR|
|   Abilene|   TX|    USA| ABI|
|     Akron|   OH|    USA| CAK|
|   Alamosa|   CO|    USA| ALS|
+----------+-----+-------+----+
only showing top 5 rows

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 5 rows



In [None]:
#Создаем временную таблицу
foo = (departure
 .filter(expr("""origin == 'SEA' and destination == 'SFO' and
 date like '1010%' and delay > 0""")))

foo.createOrReplaceTempView("foo")
foo.show(5)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1010710|   31|     590|   SEA|        SFO|
|1010955|  104|     590|   SEA|        SFO|
|1010730|    5|     590|   SEA|        SFO|
+-------+-----+--------+------+-----------+



##### Задача 2. Перевести строки в столбцы с помощью Pivot



In [None]:
#Выведем данные
spark.sql("""SELECT destination, CAST(SUBSTRING(date, 0, 1) AS int) AS month, delay
             FROM departureDelays
             WHERE origin = 'SEA'
             ORDER BY month desc""").show(5)

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        JFK|    3|   -6|
|        ORD|    3|    3|
|        DFW|    3|   -2|
|        MIA|    3|    0|
|        DFW|    3|   -8|
+-----------+-----+-----+
only showing top 5 rows



In [None]:
#В столбце month хранятся месяцы, которые надо перенести в столбцы
spark.sql("""SELECT *
             FROM (SELECT destination, CAST(SUBSTRING(date, 0, 1) AS int) AS month, delay
                   FROM departureDelays
                   WHERE origin = 'SEA')
             PIVOT (CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
                    FOR month IN (1 JAN, 2 FEB, 3 MAR))
                   """).show(5)

+-----------+------------+------------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|MAR_AvgDelay|MAR_MaxDelay|
+-----------+------------+------------+------------+------------+------------+------------+
|        GEG|        2.28|          63|        2.87|          60|        4.49|          89|
|        BUR|       -2.03|          56|       -1.89|          78|        2.01|         108|
|        SNA|       -3.58|          82|       -0.41|          90|       -1.12|         107|
|        OAK|       15.82|         385|        8.12|         150|        7.75|         130|
|        DCA|       -1.15|          50|        0.07|          34|        5.73|         199|
+-----------+------------+------------+------------+------------+------------+------------+
only showing top 5 rows



## 9. Построение Lake house на базе pyspark

Настройка Apache Spark with Delta Lake


Изменим все операции чтения и записи DataFrame - операции для использования формата («дельта») вместо формата («паркет»). Сделаем эти манипуляции с данными о кредите, которые доступны в виде файла Parquet и сохраним их как таблицу Delta Lake.

Перед началом выполнения заданий, необходимо убить нынешнюю сессию и установить версию спарка и дельты, которые будут интегрированы с друг другом.

In [None]:
!git clone https://github.com/databricks/LearningSparkV2.git

Cloning into 'LearningSparkV2'...
remote: Enumerating objects: 1720, done.[K
remote: Counting objects: 100% (1720/1720), done.[K
remote: Compressing objects: 100% (1036/1036), done.[K
remote: Total 1720 (delta 546), reused 1691 (delta 541), pack-reused 0 (from 0)[K
Receiving objects: 100% (1720/1720), 76.97 MiB | 6.86 MiB/s, done.
Resolving deltas: 100% (546/546), done.
Updating files: 100% (768/768), done.


In [None]:
!pip install pyspark==3.4.1 delta-spark==2.4.0

Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting delta-spark==2.4.0
  Downloading delta_spark-2.4.0-py3-none-any.whl.metadata (1.9 kB)
Downloading delta_spark-2.4.0-py3-none-any.whl (20 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285391 sha256=78f2161b70b4d3b9c0a65e3e96ca0f7e3c593a39426cb7b5a74fa7149bec06e0
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark, delta-spark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.3
    Uninstalling pyspark-3.5.3:
      Successfully uninstalle

In [None]:
#внесем доп. настройки в конфиг
!pyspark --packages io.delta:delta-core_2.12:2.4.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog <<-EOF



In [None]:
#создаем папку
!mkdir /content/LearningSparkV2/databricks-datasets/learning-spark-v2/loans_delta

In [None]:
#spark.stop()
#импорт библиотеки
import pyspark
from delta import *

#открываем сессию
bild = pyspark.sql.SparkSession.builder \
    .appName("Hive options in PySpark") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")


spark = configure_spark_with_delta_pip(bild).getOrCreate()


#исходные данные
sourcePath = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"

#ссылка на Delta Lake path
deltaPath = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/loans_delta"

df = spark.read.format("parquet").load(sourcePath)

#Запись в формат дельта
df.write.format("delta").mode("overwrite").save(deltaPath)
#читаем таблицу
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

In [None]:
#Теперь к ней можно обратиться как к таблице в реляционной бд
spark.sql("SELECT * FROM loans_delta WHERE addr_state = 'OR' OR addr_state = 'WA' order by addr_state").show(500)

### Настройка схемы таблицы data lake

Перед тем как писать данные в таблицы delta lake, напишем структурную схему таблицы, если файлы которые записываются в таблицу delta lake будут иметь другую схему , то выводится ошибка - чтобы ее не было необходимо добавлять параметр ("mergeSchema", "true")

In [None]:
#создаем папку
!mkdir /content/LearningSparkV2/databricks-datasets/learning-spark-v2/loans_delta_2

In [None]:
from pyspark.sql.functions import *

cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [
(1111111, 1000, 1000.0, 'TX', True),
(2222222, 2000, 0.0, 'CA', False)
]

loanUpdates = (spark.createDataFrame(items, cols)
                    .withColumn("funded_amnt", col("funded_amnt").cast("int")))

#ссылка на Delta Lake path
deltaPath_2 = "/content/LearningSparkV2/databricks-datasets/learning-spark-v2/loans_delta_2"

(loanUpdates.write.format("delta").mode("overwrite")
 .option("mergeSchema", "true")
 .save(deltaPath_2))


### Преобразование существующих данных в рамках Delta Lake.

Необходимо обновить Адрес штата OR на WA

In [None]:
from delta.tables import *

#обращение к таблице
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})


Удалим ячейки где funded_amnt = paid_amnt

In [None]:
#обращение к таблице
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt = paid_amnt")

Делаем UPSERT с помощью merge.

In [None]:
(deltaTable
 .alias("t")
 .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())

Сделаем аудит изменений таблицы

In [None]:
#deltaTable.history().show(3,False)
(deltaTable
 .history(3)
 .select("version", "timestamp", "operation", "operationParameters")
 .show(truncate=False))

+-------+-----------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationParameters                                                                                                                                                                     |
+-------+-----------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|3      |2024-10-24 12:05:55.076|MERGE    |{predicate -> ["(loan_id#2305L = loan_id#741L)"], matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}], notMatchedBySourcePredicates -> []}|
|2      |2024-10-24 12:05:43.385|DELETE   |{predicate -> ["(cast(funded_amnt#2306 as

In [None]:
#Версионирование в рамках delta lake
(spark.read.format("delta")
.option("versionAsOf", "1")
.load(deltaPath)).show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
|      5|       1000|    408.6|        CA|
|      6|       1000|   1000.0|        MD|
|      7|       1000|   168.81|        OH|
|      8|       1000|   193.64|        TX|
|      9|       1000|   218.83|        CT|
|     10|       1000|   322.37|        NJ|
|     11|       1000|   400.61|        NY|
|     12|       1000|   1000.0|        FL|
|     13|       1000|   165.88|        NJ|
|     14|       1000|    190.6|        TX|
|     15|       1000|   1000.0|        OH|
|     16|       1000|   213.72|        MI|
|     17|       1000|   188.89|        MI|
|     18|       1000|   237.41|        CA|
|     19|       1000|   203.85|        CA|
+-------+--

## 10. Machine Learning with MLlib

In [3]:
#импорт библиотеки
from pyspark.sql import SparkSession

#открываем сессию
spark = SparkSession.builder \
    .appName("Hive options in PySpark") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.0.1") \
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
filePath = """/content/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
 "number_of_reviews", "price").show(5)


+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



In [5]:
#поделим датафрейм на части
#оставим 80% для обучающего набора и 20 для тестового набора
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set""")

There are 5780 rows in the training set,
and 1366 in the test set


In [6]:
#построим линейную регрессию для прогнозирования цены с учетом количестова спален
#создаем вектор с помощью преобразователя асемблера
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows



In [8]:
#После настройки вектора ассемблера готовим и преобразуем данные в формат, который ожидает модель линейной регрессии
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

LinearRegressionModel: uid=LinearRegression_0d607577432a, numFeatures=1