In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
spark

In [3]:
myRange = spark.range(1000).toDF("number")

In [9]:
divisBy2 = myRange.where("number % 2 = 0")

In [10]:
divisBy2.count()

500

In [None]:
# An End-toEnd Example

In [11]:
flightData2015 = spark \
  .read \
  .option("inferSchema","true") \
  .option("header","true") \
  .csv("../../data/flight-data/csv/2015-summary.csv")

In [13]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [14]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#22 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#22 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#20,ORIGIN_COUNTRY_NAME#21,count#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/dev/spark-the-definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [15]:
spark.conf.set("spark.sql.shuffle.partitions","5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [17]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#22 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#22 ASC NULLS FIRST, 5)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#20,ORIGIN_COUNTRY_NAME#21,count#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/dev/spark-the-definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [21]:
# Data Frames and SQL

In [None]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [22]:

sqlWay = spark.sql("""
  select dest_country_name, count(1)
  from flight_data_2015
  group by dest_country_name
""")

dataFrameWay = flightData2015 \
  .groupBy("dest_country_name") \
  .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[dest_country_name#20], functions=[count(1)])
+- Exchange hashpartitioning(dest_country_name#20, 5)
   +- *(1) HashAggregate(keys=[dest_country_name#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/dev/spark-the-definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[dest_country_name#20], functions=[count(1)])
+- Exchange hashpartitioning(dest_country_name#20, 5)
   +- *(1) HashAggregate(keys=[dest_country_name#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/dev/spark-the-definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string

In [25]:
spark.sql("select max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [26]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [30]:
maxSql = spark.sql("""
  select dest_country_name, sum(count) as destination_total
  from flight_data_2015
  group by dest_country_name
  order by sum(count) desc
  limit 5
""")
maxSql.show()

+-----------------+-----------------+
|dest_country_name|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [34]:
from pyspark.sql.functions import desc
flightData2015 \
  .groupBy("dest_country_name") \
  .sum("count") \
  .withColumnRenamed("sum(count)","destination_total") \
  .sort(desc("destination_total")) \
  .limit(5) \
  .show()

+-----------------+-----------------+
|dest_country_name|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [36]:
from pyspark.sql.functions import desc
flightData2015 \
  .groupBy("dest_country_name") \
  .sum("count") \
  .withColumnRenamed("sum(count)","destination_total") \
  .sort(desc("destination_total")) \
  .limit(5) \
  .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#228L DESC NULLS LAST], output=[dest_country_name#20,destination_total#228L])
+- *(2) HashAggregate(keys=[dest_country_name#20], functions=[sum(cast(count#22 as bigint))])
   +- Exchange hashpartitioning(dest_country_name#20, 5)
      +- *(1) HashAggregate(keys=[dest_country_name#20], functions=[partial_sum(cast(count#22 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#20,count#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/c:/dev/spark-the-definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
