# Подготовка

## Конфигруация ширины

Настраиваем ширину вывода таблиц Numpy и Pandas, чтобы они не переходили на новую строку без надобности, но помещались на экран.

In [None]:
# Сколько символов поместится в ваш блокнот?
width = 100
print('A'*width)

In [None]:
import numpy as np

np.set_printoptions(linewidth=width)

np.arange(width)

In [None]:
import pandas as pd
pd.options.display.width = width - 10
pd.options.display.max_colwidth = width - 10
pd.DataFrame([{'col': 'A' * (width-11)}])

## Установка

In [None]:
# в colab можно ставить на систему, в jhub.jinr.ru - не разрешит
!apt-get install -q openjdk-8-jdk-headless

In [None]:
# качаем спарк
!wget -c https://apache-mirror.rbc.ru/pub/apache/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
# если ещё не извлекали - разархивируем
!if [ ! -d spark-3.2.1-bin-hadoop3.2 ]; then tar -xzf spark-3.2.1-bin-hadoop3.2.tgz; fi

In [None]:
pip install findspark # в jhub: pip install --user findspark

## Запуск

In [None]:
import os

# чтобы спарк мог запускаться
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# чтобы скрипт нашёл библиотеки спарка
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

# в jhub:
# os.environ["JAVA_HOME"] = "/zfs/store5.hydra.local/user/i/ikadochn/java_v8u181"
# os.environ["SPARK_HOME"] = f"{os.getcwd()}/spark-3.2.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
# сессия нового (SQL-подобного) API к Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()
spark

In [None]:
import pyspark
# а можно так
conf = pyspark.conf.SparkConf()
conf.setMaster("local[4]")
sc = pyspark.SparkContext.getOrCreate(conf)
sc

# Spark DataFrame

## Создание из локальных данных

In [None]:
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1'),
    Row(a=2, b=3., c='string2'),
    Row(a=4, b=5., c='string3')
])

df.show()

In [None]:
df.explain()

In [None]:
df.explain(True)

In [None]:
help(df.explain)

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1'),
    (2, 3., 'string2'),
    (3, 4., 'string3')
], schema='a long, b double, c string')

df.show()

In [None]:
df.select("a").explain(True)

In [None]:
try:
  df = spark.createDataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
  })
  df.show()
except Exception as e:
  print(e)

In [None]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
})
df = spark.createDataFrame(pandas_df)
df.show()

In [None]:
df

In [None]:
sc.parallelize([1,2,3])

In [None]:
rdd = sc.parallelize([
    (1, 2., 'string1'),
    (2, 3., 'string2'),
    (3, 4., 'string3')
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c'])
df.show()
rdd

## Просмотр, схема, сбор на клиенте

In [None]:
df.show(1)

In [None]:
df.show(2, vertical=True)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.schema

In [None]:
df.take(2), df.head(2)

In [None]:
df.rdd.collect()

In [None]:
df.collect()

In [None]:
df.toPandas()

In [None]:
df.printSchema()
df.toPandas().info()

In [None]:
df.toPandas().describe()

In [None]:
df.describe().show()

In [None]:
df.summary().show()

# SQL?

## Столбцы датафрейма

In [None]:
df.select('a', 'c').show()

In [None]:
# df.filter('a'<3).show()
df.filter(df.a < 3).show()
df.filter(2 * df.a == df.b).show()

In [None]:
df.a

In [None]:
df.select(df.a, df.c)

In [None]:
from pyspark.sql.functions import upper

df.withColumn('upper_c', upper(df.c)).show()

In [None]:
df.select(df.a * df.b).show()

In [None]:
qq = spark.createDataFrame([
    ('x', 'a'),
    ('y', 'b'),
], schema='a string, z string')
try:
  df.select(qq.a)
except Exception as e:
  print(e)

In [None]:
q = df.select(df.a.alias('a1'), df.a.alias('a2'), 'c')
q.show()
q.filter(q.a1<3).show()

In [None]:
from pyspark.sql.functions import col
col('q')

In [None]:
df.select(col('a')).show()

In [None]:
df.select(df.a.alias('a1'), df.a.alias('a2'), 'c').where(col('a1')<3).show()

In [None]:
df.filter(col('b') < 'a').show()

## Юнион (конкатенация по строкам)

In [None]:
train = pd.DataFrame({"x1": range(3),
                   "y": ["a", "b", "a"],
                   "x2": reversed(range(3)),
                   "x3": 0})
train

In [None]:
test = pd.DataFrame({"x1": [2, 3],
                   "x2": [1, 2],
                   "x3": 0})
test

In [None]:
pd.concat([train, test], ignore_index=True)

In [None]:
tr = spark.createDataFrame(train)
te = spark.createDataFrame(test)
try:
  tr.union(te).show()
except Exception as e:
  print(e)

In [None]:
from pyspark.sql.functions import lit
tr.union(te.withColumn('y', lit(None))).show()

In [None]:
te.columns

In [None]:
te.select(te.x1, lit(None), 'x2', col('x3')).show()

In [None]:
tr.union(te.select(te.x1, lit(None), 'x2', col('x3'))).show()

In [None]:
tr.drop('y').union(te).show()

In [None]:
tr.drop('y').union(te).explain(True)

## Join

In [None]:
user = pd.DataFrame({"name": ["admin", "guest"],
                      "id": [1, 123]})
user

In [None]:
n = 10
log = pd.DataFrame({"uid": np.random.choice([1, 123], n),
                    "result": np.random.choice(["done", "error"], n),
                    "time": np.arange(n)})
log

In [None]:
log.join(user.set_index('id'), on='uid')

In [None]:
users = spark.createDataFrame(user)
logs = spark.createDataFrame(log)
users.withColumnRenamed('id', 'uid').join(logs, 'uid').show()

In [None]:
users.join(logs, users.id==logs.uid).show()

## UDF - user-defined functions

In [None]:
from pyspark.sql.functions import pandas_udf

@pandas_udf('bigint')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 1

df.select(pandas_plus_one(df.a)).show()

In [None]:
[pandas_plus_one(col(c)) for c in df.columns]

In [None]:
from pyspark.sql.types import StringType
StringType.typeName()

In [None]:
@pandas_udf('string')
def pandas_to_str(series: pd.Series) -> pd.Series:
    return series.astype('str').str.pad(10, 'left', '_')

df.select(*[pandas_to_str(col(c)) for c in df.columns]).show()

In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

## Группировка данных

In [None]:
fruit = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
fruit.show()

In [None]:
fruit.groupby('color').avg().show()

In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

fruit.groupby('color').applyInPandas(plus_mean, schema=fruit.schema).show()

## SQL

In [None]:
fruit.createOrReplaceTempView("tableA")
spark.sql("SELECT * from tableA").show()

In [None]:
spark.sql("SELECT * from tableA").explain()

In [None]:
print(spark.sql("EXPLAIN SELECT * from tableA").collect()[0])

In [None]:
spark.sql("SELECT count(*) from tableA").show()

In [None]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

In [None]:
from pyspark.sql.functions import expr

fruit.selectExpr('add_one(v1)').show()
fruit.select(expr('count(*)') > 0).show()

# Ввод-вывод

## Запись и чтение результата

In [None]:
rm -r foo.csv

In [None]:
df.write.csv('foo.csv', header=True)

In [None]:
ls -lh foo.csv

In [None]:
df.rdd.getNumPartitions()

In [None]:
cat foo.csv/part-00001-*.csv

In [None]:
spark.read.csv('foo.csv/part-00001*.csv', header=True).show()

In [None]:
spark.read.csv('foo.csv', header=True).show()

In [None]:
!hd foo.csv/part-00001*.csv

In [None]:
  df.repartition(3).rdd.getNumPartitions()

In [None]:
rm -r bar.parquet

In [None]:
df.repartition(3).write.parquet('bar.parquet')

In [None]:
ls -lh bar.parquet

In [None]:
!hd bar.parquet/part-00000-*.parquet

In [None]:
spark.read.parquet('bar.parquet/part-00000-*.parquet').show()

In [None]:
spark.read.parquet('bar.parquet').show()

## Текстовый формат

In [None]:
textFile = spark.read.text("./spark-3.2.1-bin-hadoop3.2/README.md")

In [None]:
textFile.repartition(4).explain()

In [None]:
textFile

In [None]:
textFile.count()

In [None]:
textFile.first()

In [None]:
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
linesWithSpark

In [None]:
linesWithSpark.show(truncate=False)

In [None]:
linesWithSpark.rdd.getNumPartitions()

In [None]:
rm -r spark.txt

In [None]:
linesWithSpark.write.text("spark.txt")

In [None]:
ls -lh spark.txt

In [None]:
!head spark.txt/part*.txt

In [None]:
linesWithSpark.write.csv("spark.csv", header=True)

In [None]:
ls -lh spark.csv

In [None]:
!head spark.csv/part*.csv

In [None]:
rm -r spark.csv

## Кеширование

In [None]:
linesWithSpark.selectExpr("count(*)").explain(True)

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html

In [None]:
linesWithSpark.storageLevel

In [None]:
linesWithSpark.cache()

In [None]:
linesWithSpark.storageLevel

In [None]:
linesWithSpark.selectExpr("count(*)").explain(True)

In [None]:
linesWithSpark.count()

In [None]:
linesWithSpark.selectExpr("count(*)").explain(True)

# Реальный датасет

In [None]:
ch_train = spark.read.csv('/content/sample_data/california_housing_train.csv', header=True)
ch_test = spark.read.csv('/content/sample_data/california_housing_test.csv', header=True)

In [None]:
ch_train.count(), ch_test.count(), ch_train.count() + ch_test.count()

In [None]:
ch = ch_train.union(ch_test)
ch.count()

In [None]:
ch.printSchema()

In [None]:
ch.sample(False, 0.001, seed=0).show()

In [None]:
ch.selectExpr("cast(longitude as float) longitude",
    "cast(latitude as float) latitude",
    "cast(housing_median_age as float) housing_median_age",
    "cast(total_rooms as float) total_rooms",
    "cast(total_bedrooms as float) total_bedrooms",
    "cast(population as float) population",
    "cast(households as float) households",
    "cast(median_income as float) median_income",
    "cast(median_house_value as float) median_house_value").printSchema()

In [None]:
from pyspark.sql.types import FloatType

ch.select(*[ch[c].cast(FloatType()) for c in ch.columns]).printSchema()

In [None]:
ch_train_location = "sample_data/california_housing_train.csv"
ch_test_location = "sample_data/california_housing_test.csv"

reader = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",")
  
ch_train = reader.load(ch_train_location)
ch_test = reader.load(ch_test_location)
 
ch_train.show(3)
ch_test.show(3, truncate=False)
ch_test.show(1, vertical=True)

In [None]:
ch_train.printSchema()

In [None]:
ch.describe().show()

## Добавим признаков

In [None]:
ch = ch.withColumn("median_house_value", ch["median_house_value"]/100000)
ch.take(2)

In [None]:
ch["total_rooms"]/ch["households"]

In [None]:
roomsPerHousehold = ch.select(ch["total_rooms"]/ch["households"])
roomsPerHousehold

In [None]:
ch = ch.withColumn('rooms_per_household', ch["total_rooms"]/ch["households"])
ch.show(5)

In [None]:
ch = ch.withColumn('population_per_household', ch["population"]/ch["households"])
ch.show(5)

In [None]:
ch = ch.withColumn('bedrooms_per_room', ch["total_bedrooms"]/ch["total_rooms"])
ch.show(5)

In [None]:
ch = ch.select("median_house_value", 
              "total_bedrooms", 
              "population", 
              "households", 
              "median_income", 
              "rooms_per_household", 
              "population_per_household", 
              "bedrooms_per_room")

ch.show(5)

# Задание
1. Прочитать датасет пассажиров титаника в спарк
2. Вывести pie chart по количеству пассажиров в разных классах с помощью tempView и SQL
3. Найти среднюю выживаемость пассажиров по полу и классу с помощью tempView и SQL
4. Вывести pie chart по количеству пассажиров в разных классах методами спарка без tempView
5. Найти среднюю выживаемость пассажиров по полу и классу методами спарка без tempView

In [None]:
!wget -c https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv

In [None]:
!head titanic.csv