In [None]:
%delete_livy_session --cluster dataproc-course --id lesson-4.2

In [None]:
%create_livy_session --cluster dataproc-course --id lesson-4.2 --conf spark.cores.max=1 --conf spark.executor.memory=2g --conf spark.driver.memory=2g

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2

df = spark.read.option("header", True).option("inferSchema", True).csv('s3a://yc-dataproc-tasks/data/transaction_data.csv')

In [None]:
#   ===============    Обзор полученных данных   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

# Получаем массив n строк
for i in df.take(2):
    print(i)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

df = spark.read.option("header", True).csv('s3a://yc-dataproc-tasks/data/transaction_data.csv')

# Показываем 10 строк в обрезанном состоянии
df.show(10)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

# Выбирает из датафрейма n верхних строк
df.limit(5).show(truncate = False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

# Показываем 10 строк в обрезанном состоянии
df.limit(10).collect()

In [None]:
#   ===============    Изучение структуры данных   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

# Выбирает из датафрейма n верхних строк
for column_name in df.columns:
    print(f"Column '{column_name}'")

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

print("Выводим список атрибутов с помощью метода dtypes")
print(df.dtypes)
print("\nВыводим список атрибутов с помощью метода schema")
print(df.schema)

df_fields = df.schema.fields

print("\nПреобразовываем все атрибуты с типом IntegerType в DoubleType")
new_schema = StructType([StructField(field.name, DoubleType(), field.nullable) if field.dataType == IntegerType() else field for field in df_fields])

print(new_schema)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

df.printSchema()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 

df = spark\
        .read\
        .option("inferSchema", True)\
        .option("header", True)\
        .csv('s3a://yc-dataproc-tasks/data/transaction_data.csv')

# Выбирает из датафрейма n верхних строк
df.printSchema()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import col,lit
from datetime import datetime

current_datetime = datetime.now()

df.withColumn('processed_dttm', lit(current_datetime)).show(5, False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2

df.select("UserId", "Country").show(5, False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2
df.select("Country").distinct().show()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
      StructField("UserId", IntegerType()), \
      StructField("TransactionId", IntegerType()), \
      StructField("TransactionTime", StringType()), \
      StructField("ItemCode", IntegerType()), \
      StructField("ItemDescription", StringType()), \
      StructField("NumberOfItemsPurchased", StringType()), \
      StructField("CostPerItem", StringType()), \
      StructField("Country", StringType()) \
    ])
        
df = spark\
        .read\
        .option("header", True)\
        .schema(schema)\
        .csv('s3a://yc-dataproc-tasks/data/transaction_data.csv')
    
df.printSchema()

In [None]:
#   ===============    Манипуляция со столбцами   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2
from pyspark.sql.functions import col, lit

# Выбор столбцов
print("Передаем на вход массив атрибутов")
df.select(df.columns).show(2, False)

print("Атрибуты по именам")
df.select("UserId", "Country").show(2, False)

print("Используем объект Column")
df.select((col("UserId") * 100).alias("UserId"), col("Country"), lit("hello World").alias("greetings")).show(2, False)

print("Обращение к атрибуту")
df.select(df.ItemCode, (df.TransactionId - 1000000).alias("newTransaction")).show(2, False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Выбор столбцов
df\
    .dropDuplicates(["Country"])\
    .selectExpr("UserId",\
                "date_format(to_timestamp(TransactionTime, 'E MMM dd HH:mm:ss z yyyy'), 'HH:mm:ss.SSS yyyy/MMM/dd') as timestamp",\
                "concat(Country, ' test') as cntr",\
                "'hello world!' as greeting"
               )\
    .show(5, False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2
# Добавление и изменение атрибутов
from pyspark.sql.functions import when, lower, regexp_replace, length, expr

df\
    .select("ItemCode", "Country")\
    .dropDuplicates(["Country"])\
    .withColumn("spaceCount", when(length(regexp_replace(col("Country"), "\S", "")) == 1, "Single Space")
                .when(length(regexp_replace(col("Country"), "\S", "")) > 1, "Multiple Space")
                .otherwise("No spaces")
               )\
    .withColumn("ItemCode", expr("case when ItemCode > 450000 then ItemCode + 500000 else round(sqrt(ItemCode), 2) end"))\
    .show(30, truncate = False)


In [None]:
#   ===============    Манипуляция со строками датафрейма   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2
# Сортировка
from pyspark.sql.functions import col

df_without_duplicates = df.select("UserId", "TransactionId", "ItemCode").dropDuplicates(["UserId"])

df_without_duplicates.sort(col("UserId")).show(10)
print("Сортировка с использованием sort")
df_without_duplicates.sort(col("UserId").desc(), col("ItemCode").asc()).show(10)
print("Сортировка с использованием orderBy")
df_without_duplicates.orderBy(col("UserId").desc(), col("ItemCode").asc()).show(10)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2
# Фильтрация
from pyspark.sql.functions import col

item_df = df.select("ItemCode", "ItemDescription", "CostPerItem")

print("Фильтрация с использованием where")
item_df.where((col("CostPerItem") > 2.0) & (col("CostPerItem") < 3.0)).show(truncate = False)
print("Фильтрация с использованием filter")
item_df.filter((col("CostPerItem") > 2.0) & (col("ItemDescription").like("LUNCH%"))).show()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2

# Получение уникальных строк
user_df = df.select("UserId", "TransactionId", "Country")

print("Убираем полные дубли (по всем строкам)")
user_df.distinct().sort("UserId", "Country").show(10)
print("Убираем дубли только для атрибута UserId")
user_df.dropDuplicates(["UserId"]).sort("UserId").show(10)
print("Убираем дубли для атрибутов UserId и Country")
user_df.dropDuplicates(["UserId", "Country"]).sort("UserId").show(10)

In [None]:
#   ===============    Агрегаты и обогащение   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import avg, count, min, max, sum

# Получаем среднюю стоимость для каждого товара и количество купленных товаров
print("Получаем среднюю стоимость товара и сумму, сколько раз товар был куплен")
df.select("ItemCode", "CostPerItem", "NumberOfItemsPurchased")\
    .groupBy("ItemCode")\
    .agg(avg("CostPerItem").alias("AverageCost"), sum("NumberOfItemsPurchased").alias("CountOfPurchasedItems"))\
    .show(10, False)

# Получаем количество товаров, которое приобрел каждый пользователь
print("Получаем данные о том, сколько товаров приобрел каждый пользователь")
df.select("UserId", "ItemCode").distinct().groupBy("UserId").agg(count("*").alias("ItemCount")).show(10, False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import col, lit
from datetime import datetime

# Определяем текущую временную метку
current_datetime = datetime.now()

# Обогащаем датафрейм новым полем - информация о времени обработки датафрейма
df.select("UserId", "TransactionId", "ItemCode").withColumn('processed_dttm', lit(current_datetime)).show(5, False)

In [None]:
#   ===============    Функции   ===============

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import col, lower, upper, substring, split, trim, regexp_replace, length

# Функции для работы со строками
df\
    .dropDuplicates(["Country"])\
    .limit(10)\
    .select(
        "Country",\
        lower("Country").alias("lower"),\
        upper("Country").alias("upper"),\
        substring("Country", 1,3).alias("substring"),\
        split("Country", " ").alias("split"),
        trim("Country").alias("trim"),\
        regexp_replace("Country", "e", "EE").alias("regexpl_replace"),\
        length("Country").alias("length")\
    ).show()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import abs, round, ceil, pow, sqrt

# Функции для работы с числами

df\
    .dropDuplicates(["CostPerItem"])\
    .limit(15)\
    .select(\
        "CostPerItem",\
        abs("CostPerItem").alias("abs"),\
        ceil("CostPerItem").alias("ceil"),\
        round(pow("CostPerItem", 2), 3).alias("powWithRound"),\
        sqrt(abs("CostPerItem")).alias("sqrt")\
    ).sort("CostPerItem").show()

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import col, current_date, current_timestamp, dayofmonth, date_add, date_format, to_timestamp, trunc, month, year

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Функции для работы с датой и временем
df\
    .select(\
            col("TransactionTime"),\
            current_date().alias("CurrentDate"),\
            current_timestamp().alias("CurrentTimestamp"),\
            trunc(current_timestamp(), "month").alias("Trunc"),\
            to_timestamp(col("TransactionTime"), "E MMM dd HH:mm:ss z yyyy").alias("TransformedTranscation"),\
            date_format(date_add(current_timestamp(), 30), "dd/MM/yyyy").alias("DateAdd")\
           )\
    .withColumn("year", year("TransformedTranscation"))\
    .withColumn("month", month("TransformedTranscation"))\
    .withColumn("day", dayofmonth("TransformedTranscation"))\
    .show(10, truncate = False)

In [None]:
#!spark --cluster dataproc-course --session lesson-4.2 
from pyspark.sql.functions import array_contains, array_join, array_sort, col, size, split

# Функции для работы с массивами
df\
    .select(col("Country"), split(col("Country"), " ").alias("splittedCountry"))\
    .distinct()\
    .sort(col("Country").desc())\
    .select(col("Country"),\
            col("splittedCountry"),\
            array_contains(col("splittedCountry"), "United").alias("contains"),\
            size(col("splittedCountry")).alias("size"),\
            array_join(array_sort(col("splittedCountry")), "_").alias("join")\
           )\
    .show(20, False)