In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
# создание сессии с помощью SparkSession


# метод .config() - при создании сессии, метод .set() - после создания


# НАСТРОЙКА СТАТИЧЕСКОЙ АЛЛОКАЦИИ
 # .set("spark.driver.memory", "2g")
 #            .set("spark.driver.cores", 2) #Задаем только в Cluster Mode  
 #            .set("spark.executor.cores", 5) 
 #            .set("spark.executor.instances", 3) 
 #            .set("spark.dynamicAllocation.enabled",'false')
 #            .set("spark.master", "yarn")
 #            .set("spark.submit.deploymode", "client")

# НАСТРОЙКА ДИНАМИЧЕСКОЙ АЛЛОКАЦИИ
            # .set("spark.driver.memory", "2g")
            # .set("spark.driver.cores", 2) #Задаем только в Cluster Mode 
            # .set("spark.executor.cores", 5) 
            # .set("spark.submit.deploymode", "client")
            # .set("spark.dynamicAllocation.enabled", "true")
            # .set("spark.dynamicAllocation.initialExecutors", "1")
            # .set("spark.dynamicAllocation.minExecutors", "0")
            # .set("spark.dynamicAllocation.maxExecutors", "3")
            # .set("spark.dynamicAllocation.executorIdleTimeout", "360s")

spark = SparkSession.builder.appName('Practice').getOrCreate()

spark.sql("select 'hello spark'").collect()[0][0]

In [None]:
# создание ссесии с помощью SparkContext

from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local").setAppName("newSession")

sc = SparkContext(conf=conf)

spark.sql("select 'hello spark'").collect()[0][0]

In [None]:
spark

In [None]:
# конвертирую excel в csv

import pandas as pd

df_excel = pd.read_excel('cvm_srv.xlsx')

df_excel.to_csv('cvm_srv.csv', index=False)

In [None]:
# чтение фрейма данных

# InferSchema - автоматическое определение типов данных при чтении файла

df_spark = spark.read.option('header', 'true').csv('cvm_srv.csv', inferSchema=True)

In [None]:
# вывод результата

df_spark.show(1)

In [None]:
# вывод схемы

df_spark.printSchema()

In [None]:
# получение колонок

df_spark.columns

In [None]:
# получение кортежа из строк

df_spark.head(1)[0]

In [None]:
# Выборка по полю

df_spark.select('regid', 'dayy').show()

# df_spark['regid', 'dayy'].show()

#df.regid

In [None]:
# Добавление поля

df_spark = df_spark.withColumn("new_column", df_spark['regid']+1)

In [None]:
# добавление поля с константным выражением

from pyspark.sql.functions import lit

df_spark = df_spark.withColumn("const_column", lit(500))

In [None]:
# Удаление поля

df_spark = df_spark.drop('new_column')

In [None]:
# Переименование поля

df_spark.withColumnRenamed("regid", "regid_new")

In [None]:
# удаление строк с пустыми полями

# drop (how = как удалять, subset = из каких полей, thresh = какая то мера удаления)

df_spark.na.drop()

In [None]:
# чем заполнить пробелы (Null)

df_spark.na.fill('missing_values',['name_cvm'])

In [None]:
# заполнить пробелы средним значением

# можно выбрать также mod, median

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ["regid","dayy","new_column"],
                 outputCols = ["{}_imputed".format(c) for c in ["regid","dayy","new_column"]]).setStrategy("mean") 

imputer.fit(df_spark).transform(df_spark).show()

In [None]:
# фильтры

df_spark.filter("regid=68").show()

df_spark.filter(df_spark['regid']>68).show()

In [None]:

# фильтры с условием И (&) и ИЛИ (|)

df_spark.filter((df_spark['regid']>68) & (df_spark['dayy'] == 70)).show()

df_spark.filter((df_spark['regid']>68) | (df_spark['dayy'] == 70)).show()

In [None]:
# фильтры с условием НЕ (~)

df_spark.filter(~(df_spark['regid']>68)).show()

In [None]:
# Фильтр like

df_spark.filter(df_spark['global_code'].like('PE5%')).show()

In [None]:
# создание DataFrame (самостоятельно)

data = [('Lucy', 10, 3_000),('Tanya', 35, 200_000), ('Kolya', 15, 0)]

df = spark.createDataFrame(data, ['name', 'age', 'money'])


In [None]:
# Группировка

df_spark.printSchema()

In [None]:
# Группировка
# у группировки много есть агрегирующих функций (sum, min, max и тд.) Если не указывать ничего внутри функции, 
# он будет суммировать все поля числовые, если указать внутри функции, то только их

df_spark.groupBy('global_code').sum('service', 'dayy')

# df_spark.groupBy('global_code').sum()

In [None]:
# среднее и подсчет элементов

df_spark.groupBy('global_code').mean().show()

df_spark.groupBy('global_code').count().show()

In [None]:
# агрегирующая функция (два варианта применения)
from pyspark.sql.functions import sum

df_spark.groupBy("global_code").agg({'regid':'sum'}).show()

df_spark.groupBy("global_code").agg(sum('regid').alias('cnt_day')).show()

In [None]:
# сортировка 


# df_spark.sort(df_spark['dayy'].asc()).show()

df_spark.orderBy(df_spark['dayy'].asc()).show()


In [None]:
# сохранить датафрейм в памяти компьютера (для операций ускорения action операций)

df_spark.cache()

In [None]:
# MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2
# позволяет контролировать, где сохранится датафрейм, по умолчанию метод  MEMORY_AND_DISK

df_spark.persist()

In [None]:
# применения функции col (для выделения полей отдельной функцией)

from pyspark.sql.functions import col

df_spark.filter((col('global_code').like('PE%')) & \
         (col('regid') > 68)).show(10)

In [None]:
# JOIN в pypsark. В параметр how можно передать тип join-a 

# "inner" (внутреннее): возвращает строки, которые есть в обоих DataFrame.
# "outer" (внешнее) или "full" (полное): возвращает все строки из обоих DataFrame.
# "left" (левое): возвращает все строки из левого DataFrame и совпадающие строки из правого DataFrame.
# "right" (правое): возвращает все строки из правого DataFrame и совпадающие строки из левого DataFrame.
# "semi" (полу): возвращает строки из левого DataFrame, где есть совпадения в правом DataFrame.
# "anti" (анти): возвращает строки из левого DataFrame, где нет совпадений в правом DataFrame.


df1 = spark.createDataFrame([(1, "John", "Chicago"),
                            (2, "Mike", "New York"),
                            (3, "Sue", "Washington")], ["Id", "Name", "City"])

df2 = spark.createDataFrame([(1, "Blue"),
                            (2, "Red"),
                            (4, "Green")], ["Id", "Color"])

# Присоединяем df2 к df1 по "Id"
df3 = df1.join(df2, on="Id", how="inner").select('Name')

# df3 = df1.join(df2, df1.Id == df2.UserId, how="inner")

df3.show()

In [None]:
# Приведение типов данных 
from pyspark.sql.functions import col
from pyspark.sql.types import StringType


df_spark_string = df_spark.select(col('dayy').cast(StringType()))

In [None]:
# получить статистику по таблице

df_spark.describe().show()

In [None]:
# получить уникальный фрейм данных 

df_spark.distinct()