In [43]:
import os
path = os.getcwd()
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, desc

In [34]:
spark = SparkSession.builder.appName("github_csv").getOrCreate()

In [35]:
myRange = spark.range(1000).toDF("number")
myRange
divisBy2 = myRange.where("number % 2 = 0")
divisBy2.head()

Row(number=0)

In [38]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv(path +"/data/flight-data/csv/2015-summary.csv")
flightData2015.createOrReplaceTempView("flight_data_2015")

In [39]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#132, 200), ENSURE_REQUIREMENTS, [id=#208]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#132] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/duong/OneDrive/Documents/GitHub/Self-study-Spark-The-Definitive-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#132, 200), ENSURE_REQUIREMENTS, [id=#227]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#132] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/duong/OneDrive/Documents/GitHub/Self-study-

In [41]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [42]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [44]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [45]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#217L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#132,destination_total#217L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[sum(cast(count#134 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#132, 200), ENSURE_REQUIREMENTS, [id=#354]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#132], functions=[partial_sum(cast(count#134 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#132,count#134] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/duong/OneDrive/Documents/GitHub/Self-study-Spark-The-Definitive-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


