In [2]:
try:
    import pyspark
    print("PySpark is installed!")
except ImportError:
    print("PySpark is not installed.")


PySpark is installed!


In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("BasicCountTest").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/01 10:44:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
flightdata2015 = spark.read.option("inferschema", "true").option("header", "true").csv("data/flight-data/csv/2015-summary.csv")

24/11/01 10:45:05 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
flightdata2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [7]:
flightdata2015.sort("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [8]:
flightdata2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/harshini/Documents/apache_spark_datavidhya/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [10]:
flightdata2015.sort("count").take(3)

[Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [11]:
flightdata2015.sort("count").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Estonia|    1|
|              Kosovo|      United States|    1|
|              Zambia|      United States|    1|
|       United States|   Papua New Guinea|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|            Suriname|      United States|    1|
|       United States|            Croatia|    1|
|            Djibouti|      United States|    1|
|        Burkina Faso|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|             Cyprus|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United State

#### Creating a table from the dataframe using createOrReplaceTempView, so that we can use the table for sql queries

In [12]:
flightdata2015.createOrReplaceTempView("flight_data_2015")

In [13]:
sqlWay = spark.sql("""
        SELECT DEST_COUNTRY_NAME, COUNT(1)
        FROM flight_data_2015
        GROUP BY DEST_COUNTRY_NAME
""")

In [14]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=67]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/harshini/Documents/apache_spark_datavidhya/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [16]:
dataFrameWay = flightdata2015\
                .groupBy("DEST_COUNTRY_NAME")\
                .count()

In [17]:
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=80]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/harshini/Documents/apache_spark_datavidhya/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




### It creates same plan for sql or for dataframes, regardless of the language, it does same transformation

In [18]:
#using sql queries 
spark.sql("SELECT MAX(COUNT) FROM flight_data_2015").take(1)

[Row(max(COUNT)=370002)]

In [19]:
# uisng functions 
from pyspark.sql.functions import max
flightdata2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [20]:
#using sql
maxsql = spark.sql(""" 
SELECT DEST_COUNTRY_NAME, sum(COUNT) as destination_total
from flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(COUNT) DESC
LIMIT 5
""")

In [21]:
maxsql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [26]:
#using functions
from pyspark.sql.functions import desc

flightdata2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("COUNT")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [27]:
flightdata2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("COUNT")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#162L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#162L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(COUNT#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 200), ENSURE_REQUIREMENTS, [plan_id=250]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(COUNT#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/harshini/Documents/apache_spark_datavidhya/data/flight-dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


