# pyspark2
Читаем данные с hdfs и делаем джоин двух таблиц.   
Результат сохраняем в один файл с сортировкой по возрастанию цены.

In [None]:
import os
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

os.environ["HADOOP_CONF_DIR"]="/etc/hadoop/conf"
# os.environ["SPARK_HOME"]="/usr/hdp/current/spark2-client"
# os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_191/jre"

spark = pyspark.sql.SparkSession.builder\
    .master("yarn")\
    .appName("v_alehin_pyspark2")\
    .config("spark.executor.instances", "1")\
    .config("spark.executor.memory", "1G")\
    .config("spark.executor.cores", "2")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s")\
    .config("spark.dynamicAllocation.maxExecutors", "1000")\
    .config("spark.driver.memory", "1G")\
    .config("spark.driver.maxResultSize", "1G")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryoserializer.buffer.max", "1024m")\
    .getOrCreate()

spark_context = spark.sparkContext

In [None]:
print("""
Urls:
Yarn       http://91.219.226.252:8088/cluster/scheduler
Spark      http://91.219.226.252:8088/proxy/{app_id}/stages/
App info   http://91.219.226.252:8088/cluster/app/{app_id}/
""".format(app_id=spark_context.applicationId))

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as sf

user = ""  # your login

product_path = "/user/{}/data/data1/shop_product.csv".format(user)
price_path = "/user/{}/data/data1/shop_price.csv".format(user)
product_join_price_path = "/user/{}/data/pyspark2".format(user)


In [None]:
sourceProductDF = (spark.read
                   .option("header", "false")
                   .option("sep", "\t")
                   .csv(product_path)
                  )

In [None]:
sourceProductDF.printSchema()
sourceProductDF.show()

In [None]:
sourcePriceDF = (spark.read
                 .option("header", "false")
                 .option("sep", ";")
                 .csv(price_path)
                )

In [None]:
sourcePriceDF.printSchema()
sourcePriceDF.show()

In [None]:
# Приводим типы и задаем названия полей.
productDF = (
    sourceProductDF
    .select(
        sf.col("_c0").cast(IntegerType()).alias("product_id"),
        sf.col("_c1").alias("description")
    )
)

priceDF = (
    sourcePriceDF
    .select(
        sf.col("_c0").cast(IntegerType()).alias("product_id"),
        sf.col("_c1").cast(DoubleType()).alias("price")
    )
)


In [None]:
productDF.printSchema()
productDF.show()

priceDF.printSchema()
priceDF.show()

In [None]:
# Делаем inner-join
resultDF = (
    productDF
    .join(priceDF, productDF.product_id == priceDF.product_id, how='inner')
)


In [None]:
# Спарк сам по себе ничего не запоминает, 
# в примере ниже для расчета 2 раза используется count, 
# все расчеты будут выполнены 2 раза.
print "1, Count: {cnt}".format(cnt=resultDF.count())
print "2, Count/2: {cnt}".format(cnt=resultDF.count()/2)

In [None]:
# Правильно было бы записать результат в переменную и использовать ее
# Это позволит избежать 2-х расчетов count
cnt = resultDF.count()
print "1, Count: {cnt}".format(cnt=cnt)
print "2, Count/2: {cnt}".format(cnt=cnt/2)


In [None]:
resultDF.show()
cnt = resultDF.count()
print "Count: {cnt}".format(cnt=cnt)

In [None]:
# Делаем inner-join и убираем дублирующую колонку
resultDF = (
    productDF
    .join(priceDF, productDF.product_id == priceDF.product_id, how='inner')
    .select(
        productDF.product_id,
        sf.col("description"),
        sf.col("price")
    )
)


In [None]:
resultDF.show()
cnt = resultDF.count()
print "Count: {cnt}".format(cnt=cnt)


# Для примера выведем строки с product_id равным 3 и 7.
(
    resultDF
    .where((sf.col("product_id") == 3) | (sf.col("product_id") == 7))
    .show()
)

In [None]:
# Сохраняем результат. mode("overwrite") позволяет перезаписывать результат.
# Стоит обратить внимание, что мы сначала скидываем все данные в одну партицию (repartition) и только затем сортируем.
(resultDF
 .repartition(1)
 .sortWithinPartitions(sf.col("price").desc())
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", "\t")
 .csv(product_join_price_path)
)

In [None]:
# После всех экспериментов собираем итоговый "чистый" код:
sourceProductDF = (spark.read
                   .option("header", "false")
                   .option("sep", "\t")
                   .csv(product_path)
                  )

sourcePriceDF = (spark.read
                 .option("header", "false")
                 .option("sep", ";")
                 .csv(price_path)
                )

productDF = (
    sourceProductDF
    .select(
        sf.col("_c0").alias("product_id"),
        sf.col("_c1").alias("description")
    )
)

priceDF = (
    sourcePriceDF
    .select(
        sf.col("_c0").alias("product_id"),
        sf.col("_c1").cast(DoubleType()).alias("price")
    )
)

resultDF = (
    productDF
    .join(priceDF, productDF.product_id == priceDF.product_id, how='inner')
    .select(
        productDF.product_id,
        sf.col("description"),
        sf.col("price")
    )
)


(resultDF
 .repartition(1)
 .sortWithinPartitions(sf.col("price").asc())
 .write
 .mode("overwrite")
 .option("header", "true")
 .option("sep", "\t")
 .csv(product_join_price_path)
)

In [None]:
# После работы обязательно отключаем спарк и отдаем ресурсы!
spark.stop()