In [2]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Day3Practice") \
    .getOrCreate()

spark


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/17 20:07:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = [
    (1, "Alice", 5, "P"),
    (2, "Bob", 5, "A"),
    (3, "Charlie", 6, "P"),
    (4, "David", 6, "P"),
    (5, "Eva", 5, "P"),
]

columns = ["student_id", "name", "grade", "attendance_code"]

df = spark.createDataFrame(data, columns)

df.show()


                                                                                

+----------+-------+-----+---------------+
|student_id|   name|grade|attendance_code|
+----------+-------+-----+---------------+
|         1|  Alice|    5|              P|
|         2|    Bob|    5|              A|
|         3|Charlie|    6|              P|
|         4|  David|    6|              P|
|         5|    Eva|    5|              P|
+----------+-------+-----+---------------+



In [4]:
df.filter(df.grade == 5).show()

+----------+-----+-----+---------------+
|student_id| name|grade|attendance_code|
+----------+-----+-----+---------------+
|         1|Alice|    5|              P|
|         2|  Bob|    5|              A|
|         5|  Eva|    5|              P|
+----------+-----+-----+---------------+



In [5]:
from pyspark.sql.functions import when, col

df_flagged = df.withColumn( "present_flag", when(col("attendance_code") == "P", 1).otherwise(0) )

df_flagged.show()

+----------+-------+-----+---------------+------------+
|student_id|   name|grade|attendance_code|present_flag|
+----------+-------+-----+---------------+------------+
|         1|  Alice|    5|              P|           1|
|         2|    Bob|    5|              A|           0|
|         3|Charlie|    6|              P|           1|
|         4|  David|    6|              P|           1|
|         5|    Eva|    5|              P|           1|
+----------+-------+-----+---------------+------------+



In [6]:
df_summary = df_flagged.groupBy("grade").sum("present_flag")

df_summary.show()

+-----+-----------------+
|grade|sum(present_flag)|
+-----+-----------------+
|    5|                2|
|    6|                2|
+-----+-----------------+



In [7]:
df_summary.explain()

df.rdd.getNumPartitions()

df_repartitioned = df.repartition(5)
df_repartitioned.rdd.getNumPartitions()

df_coalesced = df.coalesce(1)
df_coalesced.rdd.getNumPartitions()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[grade#2L], functions=[sum(present_flag#30)])
   +- Exchange hashpartitioning(grade#2L, 200), ENSURE_REQUIREMENTS, [plan_id=99]
      +- HashAggregate(keys=[grade#2L], functions=[partial_sum(present_flag#30)])
         +- Project [grade#2L, CASE WHEN (attendance_code#3 = P) THEN 1 ELSE 0 END AS present_flag#30]
            +- Scan ExistingRDD[student_id#0L,name#1,grade#2L,attendance_code#3]




1

In [10]:
df = df.filter(df.grade == 6)
df.show()

df_plus_bonus = df.withColumn("bonus", when(col("grade") == 6, 10).otherwise(0))
df_plus_bonus.show()

df_grouped_with_sum_of_bonus = df_plus_bonus.groupBy("grade").sum("bonus")
df_grouped_with_sum_of_bonus.show()

df_grouped_with_sum_of_bonus.explain()

+----------+-------+-----+---------------+
|student_id|   name|grade|attendance_code|
+----------+-------+-----+---------------+
|         3|Charlie|    6|              P|
|         4|  David|    6|              P|
+----------+-------+-----+---------------+

+----------+-------+-----+---------------+-----+
|student_id|   name|grade|attendance_code|bonus|
+----------+-------+-----+---------------+-----+
|         3|Charlie|    6|              P|   10|
|         4|  David|    6|              P|   10|
+----------+-------+-----+---------------+-----+

+-----+----------+
|grade|sum(bonus)|
+-----+----------+
|    6|        20|
+-----+----------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[grade#2L], functions=[sum(bonus#124)])
   +- Exchange hashpartitioning(grade#2L, 200), ENSURE_REQUIREMENTS, [plan_id=281]
      +- HashAggregate(keys=[grade#2L], functions=[partial_sum(bonus#124)])
         +- Project [grade#2L, CASE WHEN (grade#2L = 6) THEN 10 ELSE 0 EN

In [14]:
df_plus_bonus.rdd.getNumPartitions()
df_grouped_with_sum_of_bonus.rdd.getNumPartitions()

df_big = df.repartition(4)

df_big.rdd.getNumPartitions()



4

In [15]:
df_big.groupBy("grade").sum("student_id").explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[grade#2L], functions=[sum(student_id#0L)])
   +- Exchange hashpartitioning(grade#2L, 200), ENSURE_REQUIREMENTS, [plan_id=366]
      +- HashAggregate(keys=[grade#2L], functions=[partial_sum(student_id#0L)])
         +- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=362]
            +- Project [student_id#0L, grade#2L]
               +- Filter (isnotnull(grade#2L) AND (grade#2L = 6))
                  +- Scan ExistingRDD[student_id#0L,name#1,grade#2L,attendance_code#3]




In [16]:
df_big.groupBy("grade").sum("student_id").rdd.getNumPartitions()


1

In [17]:
spark.conf.get("spark.sql.shuffle.partitions")


'200'